From c7467a88bc16d031b922e4a99b5f6042c205f426 Mon Sep 17 00:00:00 2001 From: vsonnier Date: Thu, 9 Feb 2017 19:12:12 +0100 Subject: [PATCH 1/8] BLOCKING_QUEUE: prepare by raising up max queue lenghts BLOCKING_QUEUE: Replaced ThreadQueue usage by ThreadBlockingQueue usage BLOCKING_QUEUE: instrument all push() with timeouts, showed some call have to be non-blocking... BLOCKING_QUEUE: tuned push()/try_push() --- CMakeLists.txt | 1 + src/CubicSDR.h | 2 +- src/audio/AudioThread.cpp | 2 + src/audio/AudioThread.h | 6 +- src/demod/DemodDefs.h | 10 +- src/demod/DemodulatorInstance.cpp | 8 + src/demod/DemodulatorPreThread.cpp | 21 +- src/demod/DemodulatorThread.cpp | 25 +-- src/demod/DemodulatorThread.h | 2 +- src/demod/DemodulatorWorkerThread.cpp | 3 +- src/demod/DemodulatorWorkerThread.h | 6 +- src/process/FFTDataDistributor.cpp | 4 +- src/process/FFTVisualDataThread.cpp | 2 - src/process/ScopeVisualProcessor.h | 2 +- src/process/SpectrumVisualDataThread.cpp | 2 +- src/process/SpectrumVisualProcessor.h | 2 +- src/process/VisualProcessor.h | 14 +- src/sdr/SDRPostThread.cpp | 71 ++---- src/sdr/SoapySDRThread.cpp | 2 +- src/sdr/SoapySDRThread.h | 4 +- src/util/ThreadBlockingQueue.cpp | 4 + src/util/ThreadBlockingQueue.h | 268 +++++++++++++++++++++++ 22 files changed, 354 insertions(+), 107 deletions(-) create mode 100644 src/util/ThreadBlockingQueue.cpp create mode 100644 src/util/ThreadBlockingQueue.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 5d9e058..ee0d4d8 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -442,6 +442,7 @@ SET (cubicsdr_headers src/util/Gradient.h src/util/Timer.h src/util/ThreadQueue.h + src/util/ThreadBlockingQueue.h src/util/MouseTracker.h src/util/GLExt.h src/util/GLFont.h diff --git a/src/CubicSDR.h b/src/CubicSDR.h index 33bfa31..3dbbbc4 100644 --- a/src/CubicSDR.h +++ b/src/CubicSDR.h @@ -12,7 +12,7 @@ #include "GLExt.h" #include "PrimaryGLContext.h" -#include "ThreadQueue.h" +#include "ThreadBlockingQueue.h" #include "SoapySDRThread.h" #include "SDREnumerator.h" #include "SDRPostThread.h" diff --git a/src/audio/AudioThread.cpp b/src/audio/AudioThread.cpp index bb8b6d0..e0d4ad1 100644 --- a/src/audio/AudioThread.cpp +++ b/src/audio/AudioThread.cpp @@ -282,6 +282,7 @@ void AudioThread::setDeviceSampleRate(int deviceId, int sampleRate) { AudioThreadCommand refreshDevice; refreshDevice.cmd = AudioThreadCommand::AUDIO_THREAD_CMD_SET_SAMPLE_RATE; refreshDevice.int_value = sampleRate; + //VSO : blocking push ! deviceController[deviceId]->getCommandQueue()->push(refreshDevice); } } @@ -479,6 +480,7 @@ void AudioThread::run() { void AudioThread::terminate() { IOThread::terminate(); AudioThreadCommand endCond; // push an empty input to bump the queue + //VSO: blocking push cmdQueue.push(endCond); } diff --git a/src/audio/AudioThread.h b/src/audio/AudioThread.h index 8eff8be..f472725 100644 --- a/src/audio/AudioThread.h +++ b/src/audio/AudioThread.h @@ -10,7 +10,7 @@ #include #include "AudioThread.h" -#include "ThreadQueue.h" +#include "ThreadBlockingQueue.h" #include "RtAudio.h" #include "DemodDefs.h" @@ -48,8 +48,8 @@ public: int int_value; }; -typedef ThreadQueue AudioThreadInputQueue; -typedef ThreadQueue AudioThreadCommandQueue; +typedef ThreadBlockingQueue AudioThreadInputQueue; +typedef ThreadBlockingQueue AudioThreadCommandQueue; class AudioThread : public IOThread { public: diff --git a/src/demod/DemodDefs.h b/src/demod/DemodDefs.h index 043ad41..d4a922a 100644 --- a/src/demod/DemodDefs.h +++ b/src/demod/DemodDefs.h @@ -3,10 +3,10 @@ #pragma once -#include "ThreadQueue.h" +#include "ThreadBlockingQueue.h" #include "CubicSDRDefs.h" #include "liquid/liquid.h" - +#include #include #include @@ -100,6 +100,6 @@ public: } }; -typedef ThreadQueue DemodulatorThreadInputQueue; -typedef ThreadQueue DemodulatorThreadPostInputQueue; -typedef ThreadQueue DemodulatorThreadControlCommandQueue; +typedef ThreadBlockingQueue DemodulatorThreadInputQueue; +typedef ThreadBlockingQueue DemodulatorThreadPostInputQueue; +typedef ThreadBlockingQueue DemodulatorThreadControlCommandQueue; diff --git a/src/demod/DemodulatorInstance.cpp b/src/demod/DemodulatorInstance.cpp index 61f4432..e8661ac 100644 --- a/src/demod/DemodulatorInstance.cpp +++ b/src/demod/DemodulatorInstance.cpp @@ -53,7 +53,9 @@ DemodulatorInstance::DemodulatorInstance() { user_label.store(new std::wstring()); pipeIQInputData = new DemodulatorThreadInputQueue; + pipeIQInputData->set_max_num_items(100); pipeIQDemodData = new DemodulatorThreadPostInputQueue; + pipeIQInputData->set_max_num_items(100); audioThread = new AudioThread(); @@ -62,7 +64,10 @@ DemodulatorInstance::DemodulatorInstance() { demodulatorPreThread->setOutputQueue("IQDataOutput",pipeIQDemodData); pipeAudioData = new AudioThreadInputQueue; + pipeAudioData->set_max_num_items(10); + threadQueueControl = new DemodulatorThreadControlCommandQueue; + threadQueueControl->set_max_num_items(2); demodulatorThread = new DemodulatorThread(this); demodulatorThread->setInputQueue("IQDataInput",pipeIQDemodData); @@ -241,6 +246,7 @@ void DemodulatorInstance::setActive(bool state) { void DemodulatorInstance::squelchAuto() { DemodulatorThreadControlCommand command; command.cmd = DemodulatorThreadControlCommand::DEMOD_THREAD_CMD_CTL_SQUELCH_ON; + //VSO: blocking push threadQueueControl->push(command); squelch = true; } @@ -257,6 +263,7 @@ void DemodulatorInstance::setSquelchEnabled(bool state) { } else if (state && !squelch) { DemodulatorThreadControlCommand command; command.cmd = DemodulatorThreadControlCommand::DEMOD_THREAD_CMD_CTL_SQUELCH_ON; + //VSO: blocking push! threadQueueControl->push(command); } @@ -292,6 +299,7 @@ void DemodulatorInstance::setOutputDevice(int device_id) { AudioThreadCommand command; command.cmd = AudioThreadCommand::AUDIO_THREAD_CMD_SET_DEVICE; command.int_value = device_id; + //VSO: blocking push audioThread->getCommandQueue()->push(command); } setAudioSampleRate(AudioThread::deviceSampleRate[device_id]); diff --git a/src/demod/DemodulatorPreThread.cpp b/src/demod/DemodulatorPreThread.cpp index e4319a4..18fbff4 100644 --- a/src/demod/DemodulatorPreThread.cpp +++ b/src/demod/DemodulatorPreThread.cpp @@ -21,7 +21,10 @@ DemodulatorPreThread::DemodulatorPreThread(DemodulatorInstance *parent) : IOThre shiftFrequency = 0; workerQueue = new DemodulatorThreadWorkerCommandQueue; + workerQueue->set_max_num_items(2); + workerResults = new DemodulatorThreadWorkerResultQueue; + workerResults->set_max_num_items(100); workerThread = new DemodulatorWorkerThread(); workerThread->setInputQueue("WorkerCommandQueue",workerQueue); @@ -120,6 +123,7 @@ void DemodulatorPreThread::run() { } modemSettingsBuffered.clear(); modemSettingsChanged.store(false); + //VSO: blocking push workerQueue->push(command); cModem = nullptr; cModemKit = nullptr; @@ -140,6 +144,7 @@ void DemodulatorPreThread::run() { sampleRateChanged.store(false); audioSampleRateChanged.store(false); modemSettingsBuffered.clear(); + //VSO: blocking workerQueue->push(command); } @@ -209,11 +214,8 @@ void DemodulatorPreThread::run() { resamp->modemKit = cModemKit; resamp->sampleRate = currentBandwidth; - if (!iqOutputQueue->push(resamp)) { - resamp->setRefCount(0); - std::cout << "DemodulatorPreThread::run() cannot push resamp into iqOutputQueue, is full !" << std::endl; - std::this_thread::yield(); - } + //VSO: blocking push + iqOutputQueue->push(resamp); } inp->decRefCount(); @@ -343,11 +345,12 @@ int DemodulatorPreThread::getAudioSampleRate() { void DemodulatorPreThread::terminate() { IOThread::terminate(); DemodulatorThreadIQData *inp = new DemodulatorThreadIQData; // push dummy to nudge queue - if (!iqInputQueue->push(inp)) { - delete inp; - } - + + //VSO: blocking push : + iqInputQueue->push(inp); + DemodulatorWorkerThreadCommand command(DemodulatorWorkerThreadCommand::DEMOD_WORKER_THREAD_CMD_NULL); + workerQueue->push(command); workerThread->terminate(); diff --git a/src/demod/DemodulatorThread.cpp b/src/demod/DemodulatorThread.cpp index 16fa619..5d1f2db 100644 --- a/src/demod/DemodulatorThread.cpp +++ b/src/demod/DemodulatorThread.cpp @@ -298,23 +298,14 @@ void DemodulatorThread::run() { ati_vis->type = 0; } - if (!localAudioVisOutputQueue->push(ati_vis)) { - ati_vis->setRefCount(0); - std::cout << "DemodulatorThread::run() cannot push ati_vis into localAudioVisOutputQueue, is full !" << std::endl; - std::this_thread::yield(); - } + //non-blocking push for audio-out + localAudioVisOutputQueue->try_push(ati_vis); } - - + if (ati != nullptr) { if (!muted.load() && (!wxGetApp().getSoloMode() || (demodInstance == wxGetApp().getDemodMgr().getLastActiveDemodulator()))) { - - if (!audioOutputQueue->push(ati)) { - ati->decRefCount(); - std::cout << "DemodulatorThread::run() cannot push ati into audioOutputQueue, is full !" << std::endl; - std::this_thread::yield(); - } - + //non-blocking push for audio-out + audioOutputQueue->push(ati); } else { ati->setRefCount(0); } @@ -367,9 +358,9 @@ void DemodulatorThread::run() { void DemodulatorThread::terminate() { IOThread::terminate(); DemodulatorThreadPostIQData *inp = new DemodulatorThreadPostIQData; // push dummy to nudge queue - if (!iqInputQueue->push(inp)) { - delete inp; - } + + //VSO: blocking push + iqInputQueue->push(inp); } bool DemodulatorThread::isMuted() { diff --git a/src/demod/DemodulatorThread.h b/src/demod/DemodulatorThread.h index 5709bb9..1df889c 100644 --- a/src/demod/DemodulatorThread.h +++ b/src/demod/DemodulatorThread.h @@ -10,7 +10,7 @@ #include "AudioThread.h" #include "Modem.h" -typedef ThreadQueue DemodulatorThreadOutputQueue; +typedef ThreadBlockingQueue DemodulatorThreadOutputQueue; #define DEMOD_VIS_SIZE 2048 #define DEMOD_SIGNAL_MIN -30 diff --git a/src/demod/DemodulatorWorkerThread.cpp b/src/demod/DemodulatorWorkerThread.cpp index 2c5c896..b1304a7 100644 --- a/src/demod/DemodulatorWorkerThread.cpp +++ b/src/demod/DemodulatorWorkerThread.cpp @@ -101,11 +101,10 @@ void DemodulatorWorkerThread::run() { result.modemType = cModemType; result.modemName = cModemName; + //VSO: blocking push resultQueue->push(result); } - } - // std::cout << "Demodulator worker thread done." << std::endl; } diff --git a/src/demod/DemodulatorWorkerThread.h b/src/demod/DemodulatorWorkerThread.h index e0b4065..cb56176 100644 --- a/src/demod/DemodulatorWorkerThread.h +++ b/src/demod/DemodulatorWorkerThread.h @@ -8,7 +8,7 @@ #include "liquid/liquid.h" #include "AudioThread.h" -#include "ThreadQueue.h" +#include "ThreadBlockingQueue.h" #include "CubicSDRDefs.h" #include "Modem.h" @@ -69,8 +69,8 @@ public: ModemSettings settings; }; -typedef ThreadQueue DemodulatorThreadWorkerCommandQueue; -typedef ThreadQueue DemodulatorThreadWorkerResultQueue; +typedef ThreadBlockingQueue DemodulatorThreadWorkerCommandQueue; +typedef ThreadBlockingQueue DemodulatorThreadWorkerResultQueue; class DemodulatorWorkerThread : public IOThread { public: diff --git a/src/process/FFTDataDistributor.cpp b/src/process/FFTDataDistributor.cpp index 7a3c66d..8636b70 100644 --- a/src/process/FFTDataDistributor.cpp +++ b/src/process/FFTDataDistributor.cpp @@ -3,6 +3,7 @@ #include "FFTDataDistributor.h" #include +#include FFTDataDistributor::FFTDataDistributor() : outputBuffers("FFTDataDistributorBuffers"), fftSize(DEFAULT_FFT_SIZE), linesPerSecond(DEFAULT_WATERFALL_LPS), lineRateAccum(0.0) { @@ -109,7 +110,8 @@ void FFTDataDistributor::process() { outp->sampleRate = inputBuffer.sampleRate; outp->data.assign(inputBuffer.data.begin()+bufferOffset+i, inputBuffer.data.begin()+bufferOffset+i+ fftSize); - distribute(outp); + //authorize distribute with losses + distribute(outp, NON_BLOCKING_TIMEOUT); while (lineRateAccum >= 1.0) { lineRateAccum -= 1.0; diff --git a/src/process/FFTVisualDataThread.cpp b/src/process/FFTVisualDataThread.cpp index 44e303d..c0c820e 100644 --- a/src/process/FFTVisualDataThread.cpp +++ b/src/process/FFTVisualDataThread.cpp @@ -53,7 +53,6 @@ void FFTVisualDataThread::run() { //this if fed by FFTDataDistributor which has a buffer of FFT_DISTRIBUTOR_BUFFER_IN_SECONDS //so sleep for << FFT_DISTRIBUTOR_BUFFER_IN_SECONDS not to be overflown std::this_thread::sleep_for(std::chrono::milliseconds((int)(FFT_DISTRIBUTOR_BUFFER_IN_SECONDS * 1000.0 / 25.0))); -// std::this_thread::yield(); int fftSize = wproc.getDesiredInputSize(); @@ -65,7 +64,6 @@ void FFTVisualDataThread::run() { if (lpsChanged.load()) { fftDistrib.setLinesPerSecond(linesPerSecond.load()); -// pipeIQDataIn->set_max_num_items(linesPerSecond.load()); lpsChanged.store(false); } diff --git a/src/process/ScopeVisualProcessor.h b/src/process/ScopeVisualProcessor.h index de0732c..d5ba64c 100644 --- a/src/process/ScopeVisualProcessor.h +++ b/src/process/ScopeVisualProcessor.h @@ -19,7 +19,7 @@ public: double fft_floor, fft_ceil; }; -typedef ThreadQueue ScopeRenderDataQueue; +typedef ThreadBlockingQueue ScopeRenderDataQueue; class ScopeVisualProcessor : public VisualProcessor { public: diff --git a/src/process/SpectrumVisualDataThread.cpp b/src/process/SpectrumVisualDataThread.cpp index d29796f..1e954a2 100644 --- a/src/process/SpectrumVisualDataThread.cpp +++ b/src/process/SpectrumVisualDataThread.cpp @@ -20,7 +20,7 @@ void SpectrumVisualDataThread::run() { while(!stopping) { //this if fed by FFTDataDistributor which has a buffer of FFT_DISTRIBUTOR_BUFFER_IN_SECONDS //so sleep for << FFT_DISTRIBUTOR_BUFFER_IN_SECONDS not to be overflown - std::this_thread::sleep_for(std::chrono::milliseconds((int)(FFT_DISTRIBUTOR_BUFFER_IN_SECONDS * 1000.0 / 25.0))); + std::this_thread::sleep_for(std::chrono::milliseconds((int)(FFT_DISTRIBUTOR_BUFFER_IN_SECONDS * 1000.0 / 25.0))); sproc.run(); } diff --git a/src/process/SpectrumVisualProcessor.h b/src/process/SpectrumVisualProcessor.h index 44467c2..bef6490 100644 --- a/src/process/SpectrumVisualProcessor.h +++ b/src/process/SpectrumVisualProcessor.h @@ -19,7 +19,7 @@ public: int bandwidth; }; -typedef ThreadQueue SpectrumVisualDataQueue; +typedef ThreadBlockingQueue SpectrumVisualDataQueue; class SpectrumVisualProcessor : public VisualProcessor { public: diff --git a/src/process/VisualProcessor.h b/src/process/VisualProcessor.h index 1db0b11..e72e1ed 100644 --- a/src/process/VisualProcessor.h +++ b/src/process/VisualProcessor.h @@ -4,7 +4,7 @@ #pragma once #include "CubicSDRDefs.h" -#include "ThreadQueue.h" +#include "ThreadBlockingQueue.h" #include "IOThread.h" #include #include @@ -12,8 +12,8 @@ template class VisualProcessor { // - typedef ThreadQueue VisualInputQueueType; - typedef ThreadQueue VisualOutputQueueType; + typedef typename ThreadBlockingQueue VisualInputQueueType; + typedef typename ThreadBlockingQueue VisualOutputQueueType; typedef typename std::vector< VisualOutputQueueType *>::iterator outputs_i; public: virtual ~VisualProcessor() { @@ -94,7 +94,9 @@ protected: //To be used by derived classes implementing //process() : will dispatch 'item' into as many //available outputs, previously set by attachOutput(). - void distribute(OutputDataType *item) { + //* \param[in] timeout The number of microseconds to wait to push an item in each one of the outputs, 0(default) means indefinite wait. + //* \param[in] errorMessage an error message written on std::cout in case pf push timeout. + void distribute(OutputDataType *item, std::uint64_t timeout = BLOCKING_INFINITE_TIMEOUT, const char* errorMessage = "") { std::lock_guard < std::recursive_mutex > busy_lock(busy_update); //We will try to distribute 'output' among all 'outputs', @@ -103,11 +105,11 @@ protected: item->setRefCount((int)outputs.size()); for (outputs_i it = outputs.begin(); it != outputs.end(); it++) { //if 'output' failed to be given to an outputs_i, dec its ref count accordingly. - if (!(*it)->push(item)) { + //blocking push, with a timeout + if (!(*it)->push(item, timeout, errorMessage)) { item->decRefCount(); } } - // Now 'item' refcount matches the times 'item' has been successfully distributed, //i.e shared among the outputs. } diff --git a/src/sdr/SDRPostThread.cpp b/src/sdr/SDRPostThread.cpp index f7b1c18..afefd60 100644 --- a/src/sdr/SDRPostThread.cpp +++ b/src/sdr/SDRPostThread.cpp @@ -108,12 +108,6 @@ void SDRPostThread::updateActiveDemodulators() { // deactivate if active if (demod->isActive() && !demod->isFollow() && !demod->isTracking()) { demod->setActive(false); - // DemodulatorThreadIQData *dummyDataOut = new DemodulatorThreadIQData; - // dummyDataOut->frequency = frequency; - // dummyDataOut->sampleRate = sampleRate; - // if (!demodQueue->push(dummyDataOut)) { - // delete dummyDataOut; - // } } // follow if follow mode @@ -237,9 +231,8 @@ void SDRPostThread::run() { void SDRPostThread::terminate() { IOThread::terminate(); SDRThreadIQData *dummy = new SDRThreadIQData; - if (!iqDataInQueue->push(dummy)) { - delete dummy; - } + //VSO: blocking push + iqDataInQueue->push(dummy); } void SDRPostThread::runSingleCH(SDRThreadIQData *data_in) { @@ -300,34 +293,23 @@ void SDRPostThread::runSingleCH(SDRThreadIQData *data_in) { iirfilt_crcf_execute_block(dcFilter, &data_in->data[0], dataSize, &demodDataOut->data[0]); if (doDemodVisOut) { - if (!iqActiveDemodVisualQueue->push(demodDataOut)) { - demodDataOut->decRefCount(); - std::cout << "SDRPostThread::runSingleCH() cannot push demodDataOut into iqActiveDemodVisualQueue, is full !" << std::endl; - std::this_thread::yield(); - } + //VSO: blocking push + iqActiveDemodVisualQueue->push(demodDataOut); } if (doIQDataOut) { - if (!iqDataOutQueue->push(demodDataOut)) { - demodDataOut->decRefCount(); - std::cout << "SDRPostThread::runSingleCH() cannot push demodDataOut into iqDataOutQueue, is full !" << std::endl; - std::this_thread::yield(); - } + //VSO: blocking push + iqDataOutQueue->push(demodDataOut); } if (doVisOut) { - if (!iqVisualQueue->push(demodDataOut)) { - demodDataOut->decRefCount(); - std::cout << "SDRPostThread::runSingleCH() cannot push demodDataOut into iqVisualQueue, is full !" << std::endl; - std::this_thread::yield(); - } + //VSO: blocking push + iqVisualQueue->push(demodDataOut); } for (size_t i = 0; i < nRunDemods; i++) { - if (!runDemods[i]->getIQInputDataPipe()->push(demodDataOut)) { - demodDataOut->decRefCount(); - std::this_thread::yield(); - } + //VSO: blocking push + runDemods[i]->getIQInputDataPipe()->push(demodDataOut); } } } @@ -365,19 +347,12 @@ void SDRPostThread::runPFBCH(SDRThreadIQData *data_in) { iqDataOut->sampleRate = data_in->sampleRate; iqDataOut->data.assign(data_in->data.begin(), data_in->data.begin() + dataSize); - if (!iqDataOutQueue->push(iqDataOut)) { - std::cout << "SDRPostThread::runPFBCH() cannot push iqDataOut into iqDataOutQueue, is full !" << std::endl; - iqDataOut->decRefCount(); - std::this_thread::yield(); - } - - - if (doVis) { - if (!iqVisualQueue->push(iqDataOut)) { - std::cout << "SDRPostThread::runPFBCH() cannot push iqDataOut into iqVisualQueue, is full !" << std::endl; - iqDataOut->decRefCount(); - std::this_thread::yield(); - } + //VSO: blocking push + iqDataOutQueue->push(iqDataOut); + + if (doVis) { + //VSO: blocking push + iqVisualQueue->push(iqDataOut); } } @@ -473,21 +448,15 @@ void SDRPostThread::runPFBCH(SDRThreadIQData *data_in) { } if (doDemodVis) { - if (!iqActiveDemodVisualQueue->push(demodDataOut)) { - std::cout << "SDRPostThread::runPFBCH() cannot push demodDataOut into iqActiveDemodVisualQueue, is full !" << std::endl; - demodDataOut->decRefCount(); - std::this_thread::yield(); - } + //VSO: blocking push + iqActiveDemodVisualQueue->push(demodDataOut); } for (size_t j = 0; j < nRunDemods; j++) { if (demodChannel[j] == i) { DemodulatorInstance *demod = runDemods[j]; - - if (!demod->getIQInputDataPipe()->push(demodDataOut)) { - demodDataOut->decRefCount(); - std::this_thread::yield(); - } + //VSO: blocking push + demod->getIQInputDataPipe()->push(demodDataOut); } } } diff --git a/src/sdr/SoapySDRThread.cpp b/src/sdr/SoapySDRThread.cpp index 0c1e36f..dd7938c 100644 --- a/src/sdr/SoapySDRThread.cpp +++ b/src/sdr/SoapySDRThread.cpp @@ -251,7 +251,7 @@ void SDRThread::readStream(SDRThreadIQDataQueue* iqDataOutQueue) { dataOut->dcCorrected = hasHardwareDC.load(); dataOut->numChannels = numChannels.load(); - if (!iqDataOutQueue->push(dataOut)) { + if (!iqDataOutQueue->try_push(dataOut)) { //The rest of the system saturates, //finally the push didn't suceeded, recycle dataOut immediatly. dataOut->setRefCount(0); diff --git a/src/sdr/SoapySDRThread.h b/src/sdr/SoapySDRThread.h index 794b837..2982ac1 100644 --- a/src/sdr/SoapySDRThread.h +++ b/src/sdr/SoapySDRThread.h @@ -5,7 +5,7 @@ #include -#include "ThreadQueue.h" +#include "ThreadBlockingQueue.h" #include "DemodulatorMgr.h" #include "SDRDeviceInfo.h" #include "AppConfig.h" @@ -39,7 +39,7 @@ public: } }; -typedef ThreadQueue SDRThreadIQDataQueue; +typedef ThreadBlockingQueue SDRThreadIQDataQueue; class SDRThread : public IOThread { private: diff --git a/src/util/ThreadBlockingQueue.cpp b/src/util/ThreadBlockingQueue.cpp new file mode 100644 index 0000000..68f23c3 --- /dev/null +++ b/src/util/ThreadBlockingQueue.cpp @@ -0,0 +1,4 @@ +// Copyright (c) Charles J. Cliffe +// SPDX-License-Identifier: GPL-2.0+ + +#include \ No newline at end of file diff --git a/src/util/ThreadBlockingQueue.h b/src/util/ThreadBlockingQueue.h new file mode 100644 index 0000000..10fad27 --- /dev/null +++ b/src/util/ThreadBlockingQueue.h @@ -0,0 +1,268 @@ +// Copyright (c) Charles J. Cliffe +// SPDX-License-Identifier: GPL-2.0+ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#define MIN_ITEM_NB (1) + +//use this timeout constant in either pop() or push() calls to indicate +// a non-blocking operation, so respectively equivalent to try_pop() and try_push() +#define NON_BLOCKING_TIMEOUT (100) + +//use this timeout constant in either pop() or push() calls to indicate +//an indefnite timeout duration. +#define BLOCKING_INFINITE_TIMEOUT (0) + +/** A thread-safe asynchronous blocking queue */ +template +class ThreadBlockingQueue : public ThreadQueueBase { + + typedef typename std::deque::value_type value_type; + typedef typename std::deque::size_type size_type; + +public: + + + /*! Create safe blocking queue. */ + ThreadBlockingQueue() { + //at least 1 (== Exchanger) + m_max_num_items = MIN_ITEM_NB; + }; + + ThreadBlockingQueue(const ThreadBlockingQueue& sq) { + std::lock_guard < std::mutex > lock(sq.m_mutex); + m_queue = sq.m_queue; + m_max_num_items = sq.m_max_num_items; + } + + /*! Destroy safe queue. */ + ~ThreadBlockingQueue() { + std::lock_guard < std::mutex > lock(m_mutex); + } + + /** + * Sets the maximum number of items in the queue. Real value is clamped + * to 1 on the lower bound. + * \param[in] nb max of items + */ + void set_max_num_items(unsigned int max_num_items) { + std::lock_guard < std::mutex > lock(m_mutex); + + if (max_num_items > m_max_num_items) { + //Only raise the existing max size, never squash it + //for simplification sake at runtime. + m_max_num_items = max_num_items; + m_cond_not_full.notify_all(); + } + } + + /** + * Pushes the item into the queue. If the queue is full, waits until room + * is available, for at most timeout microseconds. + * \param[in] item An item. + * \param[in] timeout a max waiting timeout in microseconds for an item to be pushed. + * by default, = 0 means indefinite wait. + * \param[in] errorMessage an error message written on std::cout in case of the timeout wait + * \return true if an item was pushed into the queue, else a timeout has occured. + */ + bool push(const value_type& item, std::uint64_t timeout = BLOCKING_INFINITE_TIMEOUT,const char* errorMessage = "") { + std::unique_lock < std::mutex > lock(m_mutex); + + if (timeout == BLOCKING_INFINITE_TIMEOUT) { + m_cond_not_full.wait(lock, [this]() // Lambda funct + { + return m_queue.size() < m_max_num_items; + }); + } else if (timeout <= NON_BLOCKING_TIMEOUT && m_queue.size() >= m_max_num_items) { + // if the value is below a threshold, consider it is a try_push() + return false; + } + else if (false == m_cond_not_full.wait_for(lock, std::chrono::microseconds(timeout), + [this]() { return m_queue.size() < m_max_num_items; })) { + std::cout << "WARNING: Thread 0x" << std::hex << std::this_thread::get_id() << std::dec << + " executing {" << typeid(*this).name() << "}.push() has failed with timeout > " << + (timeout * 0.001) << " ms, message: " << errorMessage << std::endl; + return false; + } + + m_queue.push_back(item); + m_cond_not_empty.notify_all(); + return true; + } + + /** + * Try to pushes the item into the queue, immediatly, without waiting. If the queue is full, the item + * is not inserted and the function returns false. + * \param[in] item An item. + */ + bool try_push(const value_type& item) { + std::lock_guard < std::mutex > lock(m_mutex); + + if (m_queue.size() >= m_max_num_items) { + return false; + } + + m_queue.push_back(item); + m_cond_not_empty.notify_all(); + return true; + } + + /** + * Pops item from the queue. If the queue is empty, blocks for timeout microseconds, or until item becomes available. + * \param[in] timeout The number of microseconds to wait. O (default) means indefinite wait. + * \param[in] errorMessage an error message written on std::cout in case of the timeout wait + * \return true if get an item from the queue, false if no item is received before the timeout. + */ + bool pop(value_type& item, std::uint64_t timeout = BLOCKING_INFINITE_TIMEOUT, const char* errorMessage = "") { + std::unique_lock < std::mutex > lock(m_mutex); + + if (timeout == BLOCKING_INFINITE_TIMEOUT) { + m_cond_not_empty.wait(lock, [this]() // Lambda funct + { + return !m_queue.empty(); + }); + } else if (timeout <= NON_BLOCKING_TIMEOUT && m_queue.empty()) { + // if the value is below a threshold, consider it is try_pop() + return false; + } + else if (false == m_cond_not_empty.wait_for(lock, std::chrono::microseconds(timeout), + [this]() { return !m_queue.empty(); })) { + std::cout << "WARNING: Thread 0x" << std::hex << std::this_thread::get_id() << std::dec << + " executing {" << typeid(*this).name() << "}.pop() has failed with timeout > " << + (timeout * 0.001) << " ms, message: " << errorMessage << std::endl; + return false; + } + + item = m_queue.front(); + m_queue.pop_front(); + m_cond_not_full.notify_all(); + return true; + } + + /** + * Tries to pop item from the queue. + * \param[out] item The item. + * \return False is returned if no item is available. + */ + bool try_pop(value_type& item) { + std::lock_guard < std::mutex > lock(m_mutex); + + if (m_queue.empty()) { + return false; + } + + item = m_queue.front(); + m_queue.pop_front(); + m_cond_not_full.notify_all(); + return true; + } + + + /** + * Gets the number of items in the queue. + * \return Number of items in the queue. + */ + size_type size() const { + std::lock_guard < std::mutex > lock(m_mutex); + return m_queue.size(); + } + + /** + * Check if the queue is empty. + * \return true if queue is empty. + */ + bool empty() const { + std::lock_guard < std::mutex > lock(m_mutex); + return m_queue.empty(); + } + + /** + * Check if the queue is full. + * \return true if queue is full. + */ + bool full() const { + std::lock_guard < std::mutex > lock(m_mutex); + return (m_queue.size() >= m_max_num_items); + } + + /** + * Remove any items in the queue. + */ + void flush() { + std::lock_guard < std::mutex > lock(m_mutex); + m_queue.clear(); + m_cond_not_full.notify_all(); + } + + /** + * Swaps the contents. + * \param[out] sq The ThreadBlockingQueue to swap with 'this'. + */ + void swap(ThreadBlockingQueue& sq) { + if (this != &sq) { + std::lock_guard < std::mutex > lock1(m_mutex); + std::lock_guard < std::mutex > lock2(sq.m_mutex); + m_queue.swap(sq.m_queue); + std::swap(m_max_num_items, sq.m_max_num_items); + + if (!m_queue.empty()) { + m_cond_not_empty.notify_all(); + } + + if (!sq.m_queue.empty()) { + sq.m_cond_not_empty.notify_all(); + } + + if (!m_queue.full()) { + m_cond_not_full.notify_all(); + } + + if (!sq.m_queue.full()) { + sq.m_cond_not_full.notify_all(); + } + } + } + + /*! The copy assignment operator */ + ThreadBlockingQueue& operator=(const ThreadBlockingQueue& sq) { + if (this != &sq) { + std::lock_guard < std::mutex > lock1(m_mutex); + std::lock_guard < std::mutex > lock2(sq.m_mutex); + + m_queue = sq.m_queue + m_max_num_items = sq.m_max_num_items; + + if (!m_queue.empty()) { + m_cond_not_empty.notify_all(); + } + + if (!m_queue.full()) { + m_cond_not_full.notify_all(); + } + } + return *this; + } + +private: + //TODO: use a circular buffer structure ? (fixed array + modulo) + std::deque m_queue; + + mutable std::mutex m_mutex; + std::condition_variable m_cond_not_empty; + std::condition_variable m_cond_not_full; + size_t m_max_num_items = MIN_ITEM_NB; +}; + +/*! Swaps the contents of two ThreadBlockingQueue objects. (external operator) */ +template +void swap(ThreadBlockingQueue& q1, ThreadBlockingQueue& q2) { + q1.swap(q2); +} From 25f7ba386d23de4923a7f69a6047f5cc2b8adfb7 Mon Sep 17 00:00:00 2001 From: vsonnier Date: Sun, 12 Feb 2017 16:48:04 +0100 Subject: [PATCH 2/8] BLOCKING_QUEUE: remove ThreadBlockingQueue dependency to ThreadQueue, remove ThreadQueue.h from CMakeLists --- CMakeLists.txt | 1 - src/IOThread.h | 2 +- src/audio/AudioThread.h | 1 - src/util/ThreadBlockingQueue.h | 9 ++++++--- 4 files changed, 7 insertions(+), 6 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index ee0d4d8..2a0906c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -441,7 +441,6 @@ SET (cubicsdr_headers src/audio/AudioThread.h src/util/Gradient.h src/util/Timer.h - src/util/ThreadQueue.h src/util/ThreadBlockingQueue.h src/util/MouseTracker.h src/util/GLExt.h diff --git a/src/IOThread.h b/src/IOThread.h index 9750366..46ce897 100644 --- a/src/IOThread.h +++ b/src/IOThread.h @@ -12,7 +12,7 @@ #include #include -#include "ThreadQueue.h" +#include "ThreadBlockingQueue.h" #include "Timer.h" struct map_string_less : public std::binary_function diff --git a/src/audio/AudioThread.h b/src/audio/AudioThread.h index f472725..ce349c0 100644 --- a/src/audio/AudioThread.h +++ b/src/audio/AudioThread.h @@ -9,7 +9,6 @@ #include #include -#include "AudioThread.h" #include "ThreadBlockingQueue.h" #include "RtAudio.h" #include "DemodDefs.h" diff --git a/src/util/ThreadBlockingQueue.h b/src/util/ThreadBlockingQueue.h index 10fad27..ab95d93 100644 --- a/src/util/ThreadBlockingQueue.h +++ b/src/util/ThreadBlockingQueue.h @@ -9,7 +9,7 @@ #include #include #include -#include +#include #define MIN_ITEM_NB (1) @@ -21,6 +21,9 @@ //an indefnite timeout duration. #define BLOCKING_INFINITE_TIMEOUT (0) +class ThreadQueueBase { +}; + /** A thread-safe asynchronous blocking queue */ template class ThreadBlockingQueue : public ThreadQueueBase { @@ -29,7 +32,6 @@ class ThreadBlockingQueue : public ThreadQueueBase { typedef typename std::deque::size_type size_type; public: - /*! Create safe blocking queue. */ ThreadBlockingQueue() { @@ -37,6 +39,7 @@ public: m_max_num_items = MIN_ITEM_NB; }; + //Copy constructor ThreadBlockingQueue(const ThreadBlockingQueue& sq) { std::lock_guard < std::mutex > lock(sq.m_mutex); m_queue = sq.m_queue; @@ -57,7 +60,7 @@ public: std::lock_guard < std::mutex > lock(m_mutex); if (max_num_items > m_max_num_items) { - //Only raise the existing max size, never squash it + //Only raise the existing max size, never reduce it //for simplification sake at runtime. m_max_num_items = max_num_items; m_cond_not_full.notify_all(); From 7d412eccb685349a79f55120c0531cf64479c6ed Mon Sep 17 00:00:00 2001 From: vsonnier Date: Mon, 13 Feb 2017 21:51:46 +0100 Subject: [PATCH 3/8] BLOCKING_QUEUE: display current Thread id in both hex and decimal so both worlds are happy (GDB and Visual Studio) --- src/util/ThreadBlockingQueue.h | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/src/util/ThreadBlockingQueue.h b/src/util/ThreadBlockingQueue.h index ab95d93..d78b923 100644 --- a/src/util/ThreadBlockingQueue.h +++ b/src/util/ThreadBlockingQueue.h @@ -90,8 +90,9 @@ public: } else if (false == m_cond_not_full.wait_for(lock, std::chrono::microseconds(timeout), [this]() { return m_queue.size() < m_max_num_items; })) { - std::cout << "WARNING: Thread 0x" << std::hex << std::this_thread::get_id() << std::dec << - " executing {" << typeid(*this).name() << "}.push() has failed with timeout > " << + std::thread::id currentThreadId = std::this_thread::get_id(); + std::cout << "WARNING: Thread 0x" << std::hex << currentThreadId << std::dec << + " (" << currentThreadId << ") executing {" << typeid(*this).name() << "}.push() has failed with timeout > " << (timeout * 0.001) << " ms, message: " << errorMessage << std::endl; return false; } @@ -138,9 +139,10 @@ public: } else if (false == m_cond_not_empty.wait_for(lock, std::chrono::microseconds(timeout), [this]() { return !m_queue.empty(); })) { - std::cout << "WARNING: Thread 0x" << std::hex << std::this_thread::get_id() << std::dec << - " executing {" << typeid(*this).name() << "}.pop() has failed with timeout > " << - (timeout * 0.001) << " ms, message: " << errorMessage << std::endl; + std::thread::id currentThreadId = std::this_thread::get_id(); + std::cout << "WARNING: Thread 0x" << std::hex << currentThreadId << std::dec << + " (" << currentThreadId << ") executing {" << typeid(*this).name() << "}.pop() has failed with timeout > " << + (timeout * 0.001) << " ms, message: " << errorMessage << std::endl; return false; } From 65c1722bca4a37dd52e579ed9821b89f49a29670 Mon Sep 17 00:00:00 2001 From: vsonnier Date: Tue, 14 Feb 2017 18:39:14 +0100 Subject: [PATCH 4/8] FIX: VisualProcessor compilation on Linux targets (#514), ThreadBlockingQueue version... --- src/process/VisualProcessor.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/process/VisualProcessor.h b/src/process/VisualProcessor.h index e72e1ed..364dc57 100644 --- a/src/process/VisualProcessor.h +++ b/src/process/VisualProcessor.h @@ -12,8 +12,8 @@ template class VisualProcessor { // - typedef typename ThreadBlockingQueue VisualInputQueueType; - typedef typename ThreadBlockingQueue VisualOutputQueueType; + typedef ThreadBlockingQueue VisualInputQueueType; + typedef ThreadBlockingQueue VisualOutputQueueType; typedef typename std::vector< VisualOutputQueueType *>::iterator outputs_i; public: virtual ~VisualProcessor() { From 3ed65eff8e3b05e0db530cafea6fdeac25e5527b Mon Sep 17 00:00:00 2001 From: vsonnier Date: Wed, 15 Feb 2017 20:27:57 +0100 Subject: [PATCH 5/8] BLOCKING_QUEUE: don't forget to cleanup if we try_push() --- src/demod/DemodulatorThread.cpp | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/src/demod/DemodulatorThread.cpp b/src/demod/DemodulatorThread.cpp index 5d1f2db..2a32e71 100644 --- a/src/demod/DemodulatorThread.cpp +++ b/src/demod/DemodulatorThread.cpp @@ -298,13 +298,17 @@ void DemodulatorThread::run() { ati_vis->type = 0; } - //non-blocking push for audio-out - localAudioVisOutputQueue->try_push(ati_vis); + if (!localAudioVisOutputQueue->try_push(ati_vis)) { + //non-blocking push for audio-out + ati_vis->setRefCount(0); + std::cout << "DemodulatorThread::run() cannot push ati_vis into localAudioVisOutputQueue, is full !" << std::endl; + std::this_thread::yield(); + } } if (ati != nullptr) { if (!muted.load() && (!wxGetApp().getSoloMode() || (demodInstance == wxGetApp().getDemodMgr().getLastActiveDemodulator()))) { - //non-blocking push for audio-out + audioOutputQueue->push(ati); } else { ati->setRefCount(0); From 9e32141a5fb726c19c550e49269e215c40cb643e Mon Sep 17 00:00:00 2001 From: vsonnier Date: Wed, 15 Feb 2017 20:45:46 +0100 Subject: [PATCH 6/8] MISC4: Fix ref count of VisualDataDistributor (harmless, currently unused) --- src/process/VisualProcessor.h | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/process/VisualProcessor.h b/src/process/VisualProcessor.h index 364dc57..7a9ee70 100644 --- a/src/process/VisualProcessor.h +++ b/src/process/VisualProcessor.h @@ -142,7 +142,12 @@ protected: } if (inp) { + int previousRefCount = inp->getRefCount(); VisualProcessor::distribute(inp); + //inp is now shared through the distribute(), which overwrite the previous ref count, + //so increment it properly. + int distributeRefCount = inp->getRefCount(); + inp->setRefCount(previousRefCount + distributeRefCount); } } } From 8dfb8ea3f1da17f9b053f86cd5843ff4319c241c Mon Sep 17 00:00:00 2001 From: "Charles J. Cliffe" Date: Thu, 16 Feb 2017 21:54:18 -0500 Subject: [PATCH 7/8] minor fixes --- src/util/ThreadBlockingQueue.h | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/util/ThreadBlockingQueue.h b/src/util/ThreadBlockingQueue.h index d78b923..47819d0 100644 --- a/src/util/ThreadBlockingQueue.h +++ b/src/util/ThreadBlockingQueue.h @@ -10,6 +10,7 @@ #include #include #include +#include #define MIN_ITEM_NB (1) @@ -242,7 +243,7 @@ public: std::lock_guard < std::mutex > lock1(m_mutex); std::lock_guard < std::mutex > lock2(sq.m_mutex); - m_queue = sq.m_queue + m_queue = sq.m_queue; m_max_num_items = sq.m_max_num_items; if (!m_queue.empty()) { From 051ebec49fdf2ec2a26c383999008ed3b793af65 Mon Sep 17 00:00:00 2001 From: vsonnier Date: Sat, 18 Feb 2017 10:15:47 +0100 Subject: [PATCH 8/8] BLOCKING_QUEUE: let audio-out try_push() too, else it often blocks evrything while in debugger... --- src/demod/DemodulatorThread.cpp | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/src/demod/DemodulatorThread.cpp b/src/demod/DemodulatorThread.cpp index 2a32e71..a186404 100644 --- a/src/demod/DemodulatorThread.cpp +++ b/src/demod/DemodulatorThread.cpp @@ -299,7 +299,7 @@ void DemodulatorThread::run() { } if (!localAudioVisOutputQueue->try_push(ati_vis)) { - //non-blocking push for audio-out + //non-blocking push needed for audio vis out ati_vis->setRefCount(0); std::cout << "DemodulatorThread::run() cannot push ati_vis into localAudioVisOutputQueue, is full !" << std::endl; std::this_thread::yield(); @@ -308,8 +308,12 @@ void DemodulatorThread::run() { if (ati != nullptr) { if (!muted.load() && (!wxGetApp().getSoloMode() || (demodInstance == wxGetApp().getDemodMgr().getLastActiveDemodulator()))) { - - audioOutputQueue->push(ati); + //non-blocking push needed for audio out + if (!audioOutputQueue->try_push(ati)) { + ati->decRefCount(); + std::cout << "DemodulatorThread::run() cannot push ati into audioOutputQueue, is full !" << std::endl; + std::this_thread::yield(); + } } else { ati->setRefCount(0); }