diff --git a/CMakeLists.txt b/CMakeLists.txt index 5d9e058..ee0d4d8 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -442,6 +442,7 @@ SET (cubicsdr_headers src/util/Gradient.h src/util/Timer.h src/util/ThreadQueue.h + src/util/ThreadBlockingQueue.h src/util/MouseTracker.h src/util/GLExt.h src/util/GLFont.h diff --git a/src/CubicSDR.h b/src/CubicSDR.h index 33bfa31..3dbbbc4 100644 --- a/src/CubicSDR.h +++ b/src/CubicSDR.h @@ -12,7 +12,7 @@ #include "GLExt.h" #include "PrimaryGLContext.h" -#include "ThreadQueue.h" +#include "ThreadBlockingQueue.h" #include "SoapySDRThread.h" #include "SDREnumerator.h" #include "SDRPostThread.h" diff --git a/src/audio/AudioThread.cpp b/src/audio/AudioThread.cpp index bb8b6d0..e0d4ad1 100644 --- a/src/audio/AudioThread.cpp +++ b/src/audio/AudioThread.cpp @@ -282,6 +282,7 @@ void AudioThread::setDeviceSampleRate(int deviceId, int sampleRate) { AudioThreadCommand refreshDevice; refreshDevice.cmd = AudioThreadCommand::AUDIO_THREAD_CMD_SET_SAMPLE_RATE; refreshDevice.int_value = sampleRate; + //VSO : blocking push ! deviceController[deviceId]->getCommandQueue()->push(refreshDevice); } } @@ -479,6 +480,7 @@ void AudioThread::run() { void AudioThread::terminate() { IOThread::terminate(); AudioThreadCommand endCond; // push an empty input to bump the queue + //VSO: blocking push cmdQueue.push(endCond); } diff --git a/src/audio/AudioThread.h b/src/audio/AudioThread.h index 8eff8be..f472725 100644 --- a/src/audio/AudioThread.h +++ b/src/audio/AudioThread.h @@ -10,7 +10,7 @@ #include #include "AudioThread.h" -#include "ThreadQueue.h" +#include "ThreadBlockingQueue.h" #include "RtAudio.h" #include "DemodDefs.h" @@ -48,8 +48,8 @@ public: int int_value; }; -typedef ThreadQueue AudioThreadInputQueue; -typedef ThreadQueue AudioThreadCommandQueue; +typedef ThreadBlockingQueue AudioThreadInputQueue; +typedef ThreadBlockingQueue AudioThreadCommandQueue; class AudioThread : public IOThread { public: diff --git a/src/demod/DemodDefs.h b/src/demod/DemodDefs.h index 043ad41..d4a922a 100644 --- a/src/demod/DemodDefs.h +++ b/src/demod/DemodDefs.h @@ -3,10 +3,10 @@ #pragma once -#include "ThreadQueue.h" +#include "ThreadBlockingQueue.h" #include "CubicSDRDefs.h" #include "liquid/liquid.h" - +#include #include #include @@ -100,6 +100,6 @@ public: } }; -typedef ThreadQueue DemodulatorThreadInputQueue; -typedef ThreadQueue DemodulatorThreadPostInputQueue; -typedef ThreadQueue DemodulatorThreadControlCommandQueue; +typedef ThreadBlockingQueue DemodulatorThreadInputQueue; +typedef ThreadBlockingQueue DemodulatorThreadPostInputQueue; +typedef ThreadBlockingQueue DemodulatorThreadControlCommandQueue; diff --git a/src/demod/DemodulatorInstance.cpp b/src/demod/DemodulatorInstance.cpp index 61f4432..e8661ac 100644 --- a/src/demod/DemodulatorInstance.cpp +++ b/src/demod/DemodulatorInstance.cpp @@ -53,7 +53,9 @@ DemodulatorInstance::DemodulatorInstance() { user_label.store(new std::wstring()); pipeIQInputData = new DemodulatorThreadInputQueue; + pipeIQInputData->set_max_num_items(100); pipeIQDemodData = new DemodulatorThreadPostInputQueue; + pipeIQInputData->set_max_num_items(100); audioThread = new AudioThread(); @@ -62,7 +64,10 @@ DemodulatorInstance::DemodulatorInstance() { demodulatorPreThread->setOutputQueue("IQDataOutput",pipeIQDemodData); pipeAudioData = new AudioThreadInputQueue; + pipeAudioData->set_max_num_items(10); + threadQueueControl = new DemodulatorThreadControlCommandQueue; + threadQueueControl->set_max_num_items(2); demodulatorThread = new DemodulatorThread(this); demodulatorThread->setInputQueue("IQDataInput",pipeIQDemodData); @@ -241,6 +246,7 @@ void DemodulatorInstance::setActive(bool state) { void DemodulatorInstance::squelchAuto() { DemodulatorThreadControlCommand command; command.cmd = DemodulatorThreadControlCommand::DEMOD_THREAD_CMD_CTL_SQUELCH_ON; + //VSO: blocking push threadQueueControl->push(command); squelch = true; } @@ -257,6 +263,7 @@ void DemodulatorInstance::setSquelchEnabled(bool state) { } else if (state && !squelch) { DemodulatorThreadControlCommand command; command.cmd = DemodulatorThreadControlCommand::DEMOD_THREAD_CMD_CTL_SQUELCH_ON; + //VSO: blocking push! threadQueueControl->push(command); } @@ -292,6 +299,7 @@ void DemodulatorInstance::setOutputDevice(int device_id) { AudioThreadCommand command; command.cmd = AudioThreadCommand::AUDIO_THREAD_CMD_SET_DEVICE; command.int_value = device_id; + //VSO: blocking push audioThread->getCommandQueue()->push(command); } setAudioSampleRate(AudioThread::deviceSampleRate[device_id]); diff --git a/src/demod/DemodulatorPreThread.cpp b/src/demod/DemodulatorPreThread.cpp index e4319a4..18fbff4 100644 --- a/src/demod/DemodulatorPreThread.cpp +++ b/src/demod/DemodulatorPreThread.cpp @@ -21,7 +21,10 @@ DemodulatorPreThread::DemodulatorPreThread(DemodulatorInstance *parent) : IOThre shiftFrequency = 0; workerQueue = new DemodulatorThreadWorkerCommandQueue; + workerQueue->set_max_num_items(2); + workerResults = new DemodulatorThreadWorkerResultQueue; + workerResults->set_max_num_items(100); workerThread = new DemodulatorWorkerThread(); workerThread->setInputQueue("WorkerCommandQueue",workerQueue); @@ -120,6 +123,7 @@ void DemodulatorPreThread::run() { } modemSettingsBuffered.clear(); modemSettingsChanged.store(false); + //VSO: blocking push workerQueue->push(command); cModem = nullptr; cModemKit = nullptr; @@ -140,6 +144,7 @@ void DemodulatorPreThread::run() { sampleRateChanged.store(false); audioSampleRateChanged.store(false); modemSettingsBuffered.clear(); + //VSO: blocking workerQueue->push(command); } @@ -209,11 +214,8 @@ void DemodulatorPreThread::run() { resamp->modemKit = cModemKit; resamp->sampleRate = currentBandwidth; - if (!iqOutputQueue->push(resamp)) { - resamp->setRefCount(0); - std::cout << "DemodulatorPreThread::run() cannot push resamp into iqOutputQueue, is full !" << std::endl; - std::this_thread::yield(); - } + //VSO: blocking push + iqOutputQueue->push(resamp); } inp->decRefCount(); @@ -343,11 +345,12 @@ int DemodulatorPreThread::getAudioSampleRate() { void DemodulatorPreThread::terminate() { IOThread::terminate(); DemodulatorThreadIQData *inp = new DemodulatorThreadIQData; // push dummy to nudge queue - if (!iqInputQueue->push(inp)) { - delete inp; - } - + + //VSO: blocking push : + iqInputQueue->push(inp); + DemodulatorWorkerThreadCommand command(DemodulatorWorkerThreadCommand::DEMOD_WORKER_THREAD_CMD_NULL); + workerQueue->push(command); workerThread->terminate(); diff --git a/src/demod/DemodulatorThread.cpp b/src/demod/DemodulatorThread.cpp index 16fa619..5d1f2db 100644 --- a/src/demod/DemodulatorThread.cpp +++ b/src/demod/DemodulatorThread.cpp @@ -298,23 +298,14 @@ void DemodulatorThread::run() { ati_vis->type = 0; } - if (!localAudioVisOutputQueue->push(ati_vis)) { - ati_vis->setRefCount(0); - std::cout << "DemodulatorThread::run() cannot push ati_vis into localAudioVisOutputQueue, is full !" << std::endl; - std::this_thread::yield(); - } + //non-blocking push for audio-out + localAudioVisOutputQueue->try_push(ati_vis); } - - + if (ati != nullptr) { if (!muted.load() && (!wxGetApp().getSoloMode() || (demodInstance == wxGetApp().getDemodMgr().getLastActiveDemodulator()))) { - - if (!audioOutputQueue->push(ati)) { - ati->decRefCount(); - std::cout << "DemodulatorThread::run() cannot push ati into audioOutputQueue, is full !" << std::endl; - std::this_thread::yield(); - } - + //non-blocking push for audio-out + audioOutputQueue->push(ati); } else { ati->setRefCount(0); } @@ -367,9 +358,9 @@ void DemodulatorThread::run() { void DemodulatorThread::terminate() { IOThread::terminate(); DemodulatorThreadPostIQData *inp = new DemodulatorThreadPostIQData; // push dummy to nudge queue - if (!iqInputQueue->push(inp)) { - delete inp; - } + + //VSO: blocking push + iqInputQueue->push(inp); } bool DemodulatorThread::isMuted() { diff --git a/src/demod/DemodulatorThread.h b/src/demod/DemodulatorThread.h index 5709bb9..1df889c 100644 --- a/src/demod/DemodulatorThread.h +++ b/src/demod/DemodulatorThread.h @@ -10,7 +10,7 @@ #include "AudioThread.h" #include "Modem.h" -typedef ThreadQueue DemodulatorThreadOutputQueue; +typedef ThreadBlockingQueue DemodulatorThreadOutputQueue; #define DEMOD_VIS_SIZE 2048 #define DEMOD_SIGNAL_MIN -30 diff --git a/src/demod/DemodulatorWorkerThread.cpp b/src/demod/DemodulatorWorkerThread.cpp index 2c5c896..b1304a7 100644 --- a/src/demod/DemodulatorWorkerThread.cpp +++ b/src/demod/DemodulatorWorkerThread.cpp @@ -101,11 +101,10 @@ void DemodulatorWorkerThread::run() { result.modemType = cModemType; result.modemName = cModemName; + //VSO: blocking push resultQueue->push(result); } - } - // std::cout << "Demodulator worker thread done." << std::endl; } diff --git a/src/demod/DemodulatorWorkerThread.h b/src/demod/DemodulatorWorkerThread.h index e0b4065..cb56176 100644 --- a/src/demod/DemodulatorWorkerThread.h +++ b/src/demod/DemodulatorWorkerThread.h @@ -8,7 +8,7 @@ #include "liquid/liquid.h" #include "AudioThread.h" -#include "ThreadQueue.h" +#include "ThreadBlockingQueue.h" #include "CubicSDRDefs.h" #include "Modem.h" @@ -69,8 +69,8 @@ public: ModemSettings settings; }; -typedef ThreadQueue DemodulatorThreadWorkerCommandQueue; -typedef ThreadQueue DemodulatorThreadWorkerResultQueue; +typedef ThreadBlockingQueue DemodulatorThreadWorkerCommandQueue; +typedef ThreadBlockingQueue DemodulatorThreadWorkerResultQueue; class DemodulatorWorkerThread : public IOThread { public: diff --git a/src/process/FFTDataDistributor.cpp b/src/process/FFTDataDistributor.cpp index 7a3c66d..8636b70 100644 --- a/src/process/FFTDataDistributor.cpp +++ b/src/process/FFTDataDistributor.cpp @@ -3,6 +3,7 @@ #include "FFTDataDistributor.h" #include +#include FFTDataDistributor::FFTDataDistributor() : outputBuffers("FFTDataDistributorBuffers"), fftSize(DEFAULT_FFT_SIZE), linesPerSecond(DEFAULT_WATERFALL_LPS), lineRateAccum(0.0) { @@ -109,7 +110,8 @@ void FFTDataDistributor::process() { outp->sampleRate = inputBuffer.sampleRate; outp->data.assign(inputBuffer.data.begin()+bufferOffset+i, inputBuffer.data.begin()+bufferOffset+i+ fftSize); - distribute(outp); + //authorize distribute with losses + distribute(outp, NON_BLOCKING_TIMEOUT); while (lineRateAccum >= 1.0) { lineRateAccum -= 1.0; diff --git a/src/process/FFTVisualDataThread.cpp b/src/process/FFTVisualDataThread.cpp index 44e303d..c0c820e 100644 --- a/src/process/FFTVisualDataThread.cpp +++ b/src/process/FFTVisualDataThread.cpp @@ -53,7 +53,6 @@ void FFTVisualDataThread::run() { //this if fed by FFTDataDistributor which has a buffer of FFT_DISTRIBUTOR_BUFFER_IN_SECONDS //so sleep for << FFT_DISTRIBUTOR_BUFFER_IN_SECONDS not to be overflown std::this_thread::sleep_for(std::chrono::milliseconds((int)(FFT_DISTRIBUTOR_BUFFER_IN_SECONDS * 1000.0 / 25.0))); -// std::this_thread::yield(); int fftSize = wproc.getDesiredInputSize(); @@ -65,7 +64,6 @@ void FFTVisualDataThread::run() { if (lpsChanged.load()) { fftDistrib.setLinesPerSecond(linesPerSecond.load()); -// pipeIQDataIn->set_max_num_items(linesPerSecond.load()); lpsChanged.store(false); } diff --git a/src/process/ScopeVisualProcessor.h b/src/process/ScopeVisualProcessor.h index de0732c..d5ba64c 100644 --- a/src/process/ScopeVisualProcessor.h +++ b/src/process/ScopeVisualProcessor.h @@ -19,7 +19,7 @@ public: double fft_floor, fft_ceil; }; -typedef ThreadQueue ScopeRenderDataQueue; +typedef ThreadBlockingQueue ScopeRenderDataQueue; class ScopeVisualProcessor : public VisualProcessor { public: diff --git a/src/process/SpectrumVisualDataThread.cpp b/src/process/SpectrumVisualDataThread.cpp index d29796f..1e954a2 100644 --- a/src/process/SpectrumVisualDataThread.cpp +++ b/src/process/SpectrumVisualDataThread.cpp @@ -20,7 +20,7 @@ void SpectrumVisualDataThread::run() { while(!stopping) { //this if fed by FFTDataDistributor which has a buffer of FFT_DISTRIBUTOR_BUFFER_IN_SECONDS //so sleep for << FFT_DISTRIBUTOR_BUFFER_IN_SECONDS not to be overflown - std::this_thread::sleep_for(std::chrono::milliseconds((int)(FFT_DISTRIBUTOR_BUFFER_IN_SECONDS * 1000.0 / 25.0))); + std::this_thread::sleep_for(std::chrono::milliseconds((int)(FFT_DISTRIBUTOR_BUFFER_IN_SECONDS * 1000.0 / 25.0))); sproc.run(); } diff --git a/src/process/SpectrumVisualProcessor.h b/src/process/SpectrumVisualProcessor.h index 44467c2..bef6490 100644 --- a/src/process/SpectrumVisualProcessor.h +++ b/src/process/SpectrumVisualProcessor.h @@ -19,7 +19,7 @@ public: int bandwidth; }; -typedef ThreadQueue SpectrumVisualDataQueue; +typedef ThreadBlockingQueue SpectrumVisualDataQueue; class SpectrumVisualProcessor : public VisualProcessor { public: diff --git a/src/process/VisualProcessor.h b/src/process/VisualProcessor.h index 1db0b11..e72e1ed 100644 --- a/src/process/VisualProcessor.h +++ b/src/process/VisualProcessor.h @@ -4,7 +4,7 @@ #pragma once #include "CubicSDRDefs.h" -#include "ThreadQueue.h" +#include "ThreadBlockingQueue.h" #include "IOThread.h" #include #include @@ -12,8 +12,8 @@ template class VisualProcessor { // - typedef ThreadQueue VisualInputQueueType; - typedef ThreadQueue VisualOutputQueueType; + typedef typename ThreadBlockingQueue VisualInputQueueType; + typedef typename ThreadBlockingQueue VisualOutputQueueType; typedef typename std::vector< VisualOutputQueueType *>::iterator outputs_i; public: virtual ~VisualProcessor() { @@ -94,7 +94,9 @@ protected: //To be used by derived classes implementing //process() : will dispatch 'item' into as many //available outputs, previously set by attachOutput(). - void distribute(OutputDataType *item) { + //* \param[in] timeout The number of microseconds to wait to push an item in each one of the outputs, 0(default) means indefinite wait. + //* \param[in] errorMessage an error message written on std::cout in case pf push timeout. + void distribute(OutputDataType *item, std::uint64_t timeout = BLOCKING_INFINITE_TIMEOUT, const char* errorMessage = "") { std::lock_guard < std::recursive_mutex > busy_lock(busy_update); //We will try to distribute 'output' among all 'outputs', @@ -103,11 +105,11 @@ protected: item->setRefCount((int)outputs.size()); for (outputs_i it = outputs.begin(); it != outputs.end(); it++) { //if 'output' failed to be given to an outputs_i, dec its ref count accordingly. - if (!(*it)->push(item)) { + //blocking push, with a timeout + if (!(*it)->push(item, timeout, errorMessage)) { item->decRefCount(); } } - // Now 'item' refcount matches the times 'item' has been successfully distributed, //i.e shared among the outputs. } diff --git a/src/sdr/SDRPostThread.cpp b/src/sdr/SDRPostThread.cpp index f7b1c18..afefd60 100644 --- a/src/sdr/SDRPostThread.cpp +++ b/src/sdr/SDRPostThread.cpp @@ -108,12 +108,6 @@ void SDRPostThread::updateActiveDemodulators() { // deactivate if active if (demod->isActive() && !demod->isFollow() && !demod->isTracking()) { demod->setActive(false); - // DemodulatorThreadIQData *dummyDataOut = new DemodulatorThreadIQData; - // dummyDataOut->frequency = frequency; - // dummyDataOut->sampleRate = sampleRate; - // if (!demodQueue->push(dummyDataOut)) { - // delete dummyDataOut; - // } } // follow if follow mode @@ -237,9 +231,8 @@ void SDRPostThread::run() { void SDRPostThread::terminate() { IOThread::terminate(); SDRThreadIQData *dummy = new SDRThreadIQData; - if (!iqDataInQueue->push(dummy)) { - delete dummy; - } + //VSO: blocking push + iqDataInQueue->push(dummy); } void SDRPostThread::runSingleCH(SDRThreadIQData *data_in) { @@ -300,34 +293,23 @@ void SDRPostThread::runSingleCH(SDRThreadIQData *data_in) { iirfilt_crcf_execute_block(dcFilter, &data_in->data[0], dataSize, &demodDataOut->data[0]); if (doDemodVisOut) { - if (!iqActiveDemodVisualQueue->push(demodDataOut)) { - demodDataOut->decRefCount(); - std::cout << "SDRPostThread::runSingleCH() cannot push demodDataOut into iqActiveDemodVisualQueue, is full !" << std::endl; - std::this_thread::yield(); - } + //VSO: blocking push + iqActiveDemodVisualQueue->push(demodDataOut); } if (doIQDataOut) { - if (!iqDataOutQueue->push(demodDataOut)) { - demodDataOut->decRefCount(); - std::cout << "SDRPostThread::runSingleCH() cannot push demodDataOut into iqDataOutQueue, is full !" << std::endl; - std::this_thread::yield(); - } + //VSO: blocking push + iqDataOutQueue->push(demodDataOut); } if (doVisOut) { - if (!iqVisualQueue->push(demodDataOut)) { - demodDataOut->decRefCount(); - std::cout << "SDRPostThread::runSingleCH() cannot push demodDataOut into iqVisualQueue, is full !" << std::endl; - std::this_thread::yield(); - } + //VSO: blocking push + iqVisualQueue->push(demodDataOut); } for (size_t i = 0; i < nRunDemods; i++) { - if (!runDemods[i]->getIQInputDataPipe()->push(demodDataOut)) { - demodDataOut->decRefCount(); - std::this_thread::yield(); - } + //VSO: blocking push + runDemods[i]->getIQInputDataPipe()->push(demodDataOut); } } } @@ -365,19 +347,12 @@ void SDRPostThread::runPFBCH(SDRThreadIQData *data_in) { iqDataOut->sampleRate = data_in->sampleRate; iqDataOut->data.assign(data_in->data.begin(), data_in->data.begin() + dataSize); - if (!iqDataOutQueue->push(iqDataOut)) { - std::cout << "SDRPostThread::runPFBCH() cannot push iqDataOut into iqDataOutQueue, is full !" << std::endl; - iqDataOut->decRefCount(); - std::this_thread::yield(); - } - - - if (doVis) { - if (!iqVisualQueue->push(iqDataOut)) { - std::cout << "SDRPostThread::runPFBCH() cannot push iqDataOut into iqVisualQueue, is full !" << std::endl; - iqDataOut->decRefCount(); - std::this_thread::yield(); - } + //VSO: blocking push + iqDataOutQueue->push(iqDataOut); + + if (doVis) { + //VSO: blocking push + iqVisualQueue->push(iqDataOut); } } @@ -473,21 +448,15 @@ void SDRPostThread::runPFBCH(SDRThreadIQData *data_in) { } if (doDemodVis) { - if (!iqActiveDemodVisualQueue->push(demodDataOut)) { - std::cout << "SDRPostThread::runPFBCH() cannot push demodDataOut into iqActiveDemodVisualQueue, is full !" << std::endl; - demodDataOut->decRefCount(); - std::this_thread::yield(); - } + //VSO: blocking push + iqActiveDemodVisualQueue->push(demodDataOut); } for (size_t j = 0; j < nRunDemods; j++) { if (demodChannel[j] == i) { DemodulatorInstance *demod = runDemods[j]; - - if (!demod->getIQInputDataPipe()->push(demodDataOut)) { - demodDataOut->decRefCount(); - std::this_thread::yield(); - } + //VSO: blocking push + demod->getIQInputDataPipe()->push(demodDataOut); } } } diff --git a/src/sdr/SoapySDRThread.cpp b/src/sdr/SoapySDRThread.cpp index 0c1e36f..dd7938c 100644 --- a/src/sdr/SoapySDRThread.cpp +++ b/src/sdr/SoapySDRThread.cpp @@ -251,7 +251,7 @@ void SDRThread::readStream(SDRThreadIQDataQueue* iqDataOutQueue) { dataOut->dcCorrected = hasHardwareDC.load(); dataOut->numChannels = numChannels.load(); - if (!iqDataOutQueue->push(dataOut)) { + if (!iqDataOutQueue->try_push(dataOut)) { //The rest of the system saturates, //finally the push didn't suceeded, recycle dataOut immediatly. dataOut->setRefCount(0); diff --git a/src/sdr/SoapySDRThread.h b/src/sdr/SoapySDRThread.h index 794b837..2982ac1 100644 --- a/src/sdr/SoapySDRThread.h +++ b/src/sdr/SoapySDRThread.h @@ -5,7 +5,7 @@ #include -#include "ThreadQueue.h" +#include "ThreadBlockingQueue.h" #include "DemodulatorMgr.h" #include "SDRDeviceInfo.h" #include "AppConfig.h" @@ -39,7 +39,7 @@ public: } }; -typedef ThreadQueue SDRThreadIQDataQueue; +typedef ThreadBlockingQueue SDRThreadIQDataQueue; class SDRThread : public IOThread { private: diff --git a/src/util/ThreadBlockingQueue.cpp b/src/util/ThreadBlockingQueue.cpp new file mode 100644 index 0000000..68f23c3 --- /dev/null +++ b/src/util/ThreadBlockingQueue.cpp @@ -0,0 +1,4 @@ +// Copyright (c) Charles J. Cliffe +// SPDX-License-Identifier: GPL-2.0+ + +#include \ No newline at end of file diff --git a/src/util/ThreadBlockingQueue.h b/src/util/ThreadBlockingQueue.h new file mode 100644 index 0000000..10fad27 --- /dev/null +++ b/src/util/ThreadBlockingQueue.h @@ -0,0 +1,268 @@ +// Copyright (c) Charles J. Cliffe +// SPDX-License-Identifier: GPL-2.0+ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#define MIN_ITEM_NB (1) + +//use this timeout constant in either pop() or push() calls to indicate +// a non-blocking operation, so respectively equivalent to try_pop() and try_push() +#define NON_BLOCKING_TIMEOUT (100) + +//use this timeout constant in either pop() or push() calls to indicate +//an indefnite timeout duration. +#define BLOCKING_INFINITE_TIMEOUT (0) + +/** A thread-safe asynchronous blocking queue */ +template +class ThreadBlockingQueue : public ThreadQueueBase { + + typedef typename std::deque::value_type value_type; + typedef typename std::deque::size_type size_type; + +public: + + + /*! Create safe blocking queue. */ + ThreadBlockingQueue() { + //at least 1 (== Exchanger) + m_max_num_items = MIN_ITEM_NB; + }; + + ThreadBlockingQueue(const ThreadBlockingQueue& sq) { + std::lock_guard < std::mutex > lock(sq.m_mutex); + m_queue = sq.m_queue; + m_max_num_items = sq.m_max_num_items; + } + + /*! Destroy safe queue. */ + ~ThreadBlockingQueue() { + std::lock_guard < std::mutex > lock(m_mutex); + } + + /** + * Sets the maximum number of items in the queue. Real value is clamped + * to 1 on the lower bound. + * \param[in] nb max of items + */ + void set_max_num_items(unsigned int max_num_items) { + std::lock_guard < std::mutex > lock(m_mutex); + + if (max_num_items > m_max_num_items) { + //Only raise the existing max size, never squash it + //for simplification sake at runtime. + m_max_num_items = max_num_items; + m_cond_not_full.notify_all(); + } + } + + /** + * Pushes the item into the queue. If the queue is full, waits until room + * is available, for at most timeout microseconds. + * \param[in] item An item. + * \param[in] timeout a max waiting timeout in microseconds for an item to be pushed. + * by default, = 0 means indefinite wait. + * \param[in] errorMessage an error message written on std::cout in case of the timeout wait + * \return true if an item was pushed into the queue, else a timeout has occured. + */ + bool push(const value_type& item, std::uint64_t timeout = BLOCKING_INFINITE_TIMEOUT,const char* errorMessage = "") { + std::unique_lock < std::mutex > lock(m_mutex); + + if (timeout == BLOCKING_INFINITE_TIMEOUT) { + m_cond_not_full.wait(lock, [this]() // Lambda funct + { + return m_queue.size() < m_max_num_items; + }); + } else if (timeout <= NON_BLOCKING_TIMEOUT && m_queue.size() >= m_max_num_items) { + // if the value is below a threshold, consider it is a try_push() + return false; + } + else if (false == m_cond_not_full.wait_for(lock, std::chrono::microseconds(timeout), + [this]() { return m_queue.size() < m_max_num_items; })) { + std::cout << "WARNING: Thread 0x" << std::hex << std::this_thread::get_id() << std::dec << + " executing {" << typeid(*this).name() << "}.push() has failed with timeout > " << + (timeout * 0.001) << " ms, message: " << errorMessage << std::endl; + return false; + } + + m_queue.push_back(item); + m_cond_not_empty.notify_all(); + return true; + } + + /** + * Try to pushes the item into the queue, immediatly, without waiting. If the queue is full, the item + * is not inserted and the function returns false. + * \param[in] item An item. + */ + bool try_push(const value_type& item) { + std::lock_guard < std::mutex > lock(m_mutex); + + if (m_queue.size() >= m_max_num_items) { + return false; + } + + m_queue.push_back(item); + m_cond_not_empty.notify_all(); + return true; + } + + /** + * Pops item from the queue. If the queue is empty, blocks for timeout microseconds, or until item becomes available. + * \param[in] timeout The number of microseconds to wait. O (default) means indefinite wait. + * \param[in] errorMessage an error message written on std::cout in case of the timeout wait + * \return true if get an item from the queue, false if no item is received before the timeout. + */ + bool pop(value_type& item, std::uint64_t timeout = BLOCKING_INFINITE_TIMEOUT, const char* errorMessage = "") { + std::unique_lock < std::mutex > lock(m_mutex); + + if (timeout == BLOCKING_INFINITE_TIMEOUT) { + m_cond_not_empty.wait(lock, [this]() // Lambda funct + { + return !m_queue.empty(); + }); + } else if (timeout <= NON_BLOCKING_TIMEOUT && m_queue.empty()) { + // if the value is below a threshold, consider it is try_pop() + return false; + } + else if (false == m_cond_not_empty.wait_for(lock, std::chrono::microseconds(timeout), + [this]() { return !m_queue.empty(); })) { + std::cout << "WARNING: Thread 0x" << std::hex << std::this_thread::get_id() << std::dec << + " executing {" << typeid(*this).name() << "}.pop() has failed with timeout > " << + (timeout * 0.001) << " ms, message: " << errorMessage << std::endl; + return false; + } + + item = m_queue.front(); + m_queue.pop_front(); + m_cond_not_full.notify_all(); + return true; + } + + /** + * Tries to pop item from the queue. + * \param[out] item The item. + * \return False is returned if no item is available. + */ + bool try_pop(value_type& item) { + std::lock_guard < std::mutex > lock(m_mutex); + + if (m_queue.empty()) { + return false; + } + + item = m_queue.front(); + m_queue.pop_front(); + m_cond_not_full.notify_all(); + return true; + } + + + /** + * Gets the number of items in the queue. + * \return Number of items in the queue. + */ + size_type size() const { + std::lock_guard < std::mutex > lock(m_mutex); + return m_queue.size(); + } + + /** + * Check if the queue is empty. + * \return true if queue is empty. + */ + bool empty() const { + std::lock_guard < std::mutex > lock(m_mutex); + return m_queue.empty(); + } + + /** + * Check if the queue is full. + * \return true if queue is full. + */ + bool full() const { + std::lock_guard < std::mutex > lock(m_mutex); + return (m_queue.size() >= m_max_num_items); + } + + /** + * Remove any items in the queue. + */ + void flush() { + std::lock_guard < std::mutex > lock(m_mutex); + m_queue.clear(); + m_cond_not_full.notify_all(); + } + + /** + * Swaps the contents. + * \param[out] sq The ThreadBlockingQueue to swap with 'this'. + */ + void swap(ThreadBlockingQueue& sq) { + if (this != &sq) { + std::lock_guard < std::mutex > lock1(m_mutex); + std::lock_guard < std::mutex > lock2(sq.m_mutex); + m_queue.swap(sq.m_queue); + std::swap(m_max_num_items, sq.m_max_num_items); + + if (!m_queue.empty()) { + m_cond_not_empty.notify_all(); + } + + if (!sq.m_queue.empty()) { + sq.m_cond_not_empty.notify_all(); + } + + if (!m_queue.full()) { + m_cond_not_full.notify_all(); + } + + if (!sq.m_queue.full()) { + sq.m_cond_not_full.notify_all(); + } + } + } + + /*! The copy assignment operator */ + ThreadBlockingQueue& operator=(const ThreadBlockingQueue& sq) { + if (this != &sq) { + std::lock_guard < std::mutex > lock1(m_mutex); + std::lock_guard < std::mutex > lock2(sq.m_mutex); + + m_queue = sq.m_queue + m_max_num_items = sq.m_max_num_items; + + if (!m_queue.empty()) { + m_cond_not_empty.notify_all(); + } + + if (!m_queue.full()) { + m_cond_not_full.notify_all(); + } + } + return *this; + } + +private: + //TODO: use a circular buffer structure ? (fixed array + modulo) + std::deque m_queue; + + mutable std::mutex m_mutex; + std::condition_variable m_cond_not_empty; + std::condition_variable m_cond_not_full; + size_t m_max_num_items = MIN_ITEM_NB; +}; + +/*! Swaps the contents of two ThreadBlockingQueue objects. (external operator) */ +template +void swap(ThreadBlockingQueue& q1, ThreadBlockingQueue& q2) { + q1.swap(q2); +}