BLOCKING_QUEUE: prepare by raising up max queue lenghts

BLOCKING_QUEUE: Replaced ThreadQueue usage by ThreadBlockingQueue usage
BLOCKING_QUEUE: instrument all push() with timeouts, showed some call have to be non-blocking...
BLOCKING_QUEUE: tuned push()/try_push()
This commit is contained in:
vsonnier 2017-02-09 19:12:12 +01:00
parent e173eec3ef
commit c7467a88bc
22 changed files with 354 additions and 107 deletions

View File

@ -442,6 +442,7 @@ SET (cubicsdr_headers
src/util/Gradient.h
src/util/Timer.h
src/util/ThreadQueue.h
src/util/ThreadBlockingQueue.h
src/util/MouseTracker.h
src/util/GLExt.h
src/util/GLFont.h

View File

@ -12,7 +12,7 @@
#include "GLExt.h"
#include "PrimaryGLContext.h"
#include "ThreadQueue.h"
#include "ThreadBlockingQueue.h"
#include "SoapySDRThread.h"
#include "SDREnumerator.h"
#include "SDRPostThread.h"

View File

@ -282,6 +282,7 @@ void AudioThread::setDeviceSampleRate(int deviceId, int sampleRate) {
AudioThreadCommand refreshDevice;
refreshDevice.cmd = AudioThreadCommand::AUDIO_THREAD_CMD_SET_SAMPLE_RATE;
refreshDevice.int_value = sampleRate;
//VSO : blocking push !
deviceController[deviceId]->getCommandQueue()->push(refreshDevice);
}
}
@ -479,6 +480,7 @@ void AudioThread::run() {
void AudioThread::terminate() {
IOThread::terminate();
AudioThreadCommand endCond; // push an empty input to bump the queue
//VSO: blocking push
cmdQueue.push(endCond);
}

View File

@ -10,7 +10,7 @@
#include <atomic>
#include "AudioThread.h"
#include "ThreadQueue.h"
#include "ThreadBlockingQueue.h"
#include "RtAudio.h"
#include "DemodDefs.h"
@ -48,8 +48,8 @@ public:
int int_value;
};
typedef ThreadQueue<AudioThreadInput *> AudioThreadInputQueue;
typedef ThreadQueue<AudioThreadCommand> AudioThreadCommandQueue;
typedef ThreadBlockingQueue<AudioThreadInput *> AudioThreadInputQueue;
typedef ThreadBlockingQueue<AudioThreadCommand> AudioThreadCommandQueue;
class AudioThread : public IOThread {
public:

View File

@ -3,10 +3,10 @@
#pragma once
#include "ThreadQueue.h"
#include "ThreadBlockingQueue.h"
#include "CubicSDRDefs.h"
#include "liquid/liquid.h"
#include <vector>
#include <atomic>
#include <mutex>
@ -100,6 +100,6 @@ public:
}
};
typedef ThreadQueue<DemodulatorThreadIQData *> DemodulatorThreadInputQueue;
typedef ThreadQueue<DemodulatorThreadPostIQData *> DemodulatorThreadPostInputQueue;
typedef ThreadQueue<DemodulatorThreadControlCommand> DemodulatorThreadControlCommandQueue;
typedef ThreadBlockingQueue<DemodulatorThreadIQData *> DemodulatorThreadInputQueue;
typedef ThreadBlockingQueue<DemodulatorThreadPostIQData *> DemodulatorThreadPostInputQueue;
typedef ThreadBlockingQueue<DemodulatorThreadControlCommand> DemodulatorThreadControlCommandQueue;

View File

@ -53,7 +53,9 @@ DemodulatorInstance::DemodulatorInstance() {
user_label.store(new std::wstring());
pipeIQInputData = new DemodulatorThreadInputQueue;
pipeIQInputData->set_max_num_items(100);
pipeIQDemodData = new DemodulatorThreadPostInputQueue;
pipeIQInputData->set_max_num_items(100);
audioThread = new AudioThread();
@ -62,7 +64,10 @@ DemodulatorInstance::DemodulatorInstance() {
demodulatorPreThread->setOutputQueue("IQDataOutput",pipeIQDemodData);
pipeAudioData = new AudioThreadInputQueue;
pipeAudioData->set_max_num_items(10);
threadQueueControl = new DemodulatorThreadControlCommandQueue;
threadQueueControl->set_max_num_items(2);
demodulatorThread = new DemodulatorThread(this);
demodulatorThread->setInputQueue("IQDataInput",pipeIQDemodData);
@ -241,6 +246,7 @@ void DemodulatorInstance::setActive(bool state) {
void DemodulatorInstance::squelchAuto() {
DemodulatorThreadControlCommand command;
command.cmd = DemodulatorThreadControlCommand::DEMOD_THREAD_CMD_CTL_SQUELCH_ON;
//VSO: blocking push
threadQueueControl->push(command);
squelch = true;
}
@ -257,6 +263,7 @@ void DemodulatorInstance::setSquelchEnabled(bool state) {
} else if (state && !squelch) {
DemodulatorThreadControlCommand command;
command.cmd = DemodulatorThreadControlCommand::DEMOD_THREAD_CMD_CTL_SQUELCH_ON;
//VSO: blocking push!
threadQueueControl->push(command);
}
@ -292,6 +299,7 @@ void DemodulatorInstance::setOutputDevice(int device_id) {
AudioThreadCommand command;
command.cmd = AudioThreadCommand::AUDIO_THREAD_CMD_SET_DEVICE;
command.int_value = device_id;
//VSO: blocking push
audioThread->getCommandQueue()->push(command);
}
setAudioSampleRate(AudioThread::deviceSampleRate[device_id]);

View File

@ -21,7 +21,10 @@ DemodulatorPreThread::DemodulatorPreThread(DemodulatorInstance *parent) : IOThre
shiftFrequency = 0;
workerQueue = new DemodulatorThreadWorkerCommandQueue;
workerQueue->set_max_num_items(2);
workerResults = new DemodulatorThreadWorkerResultQueue;
workerResults->set_max_num_items(100);
workerThread = new DemodulatorWorkerThread();
workerThread->setInputQueue("WorkerCommandQueue",workerQueue);
@ -120,6 +123,7 @@ void DemodulatorPreThread::run() {
}
modemSettingsBuffered.clear();
modemSettingsChanged.store(false);
//VSO: blocking push
workerQueue->push(command);
cModem = nullptr;
cModemKit = nullptr;
@ -140,6 +144,7 @@ void DemodulatorPreThread::run() {
sampleRateChanged.store(false);
audioSampleRateChanged.store(false);
modemSettingsBuffered.clear();
//VSO: blocking
workerQueue->push(command);
}
@ -209,11 +214,8 @@ void DemodulatorPreThread::run() {
resamp->modemKit = cModemKit;
resamp->sampleRate = currentBandwidth;
if (!iqOutputQueue->push(resamp)) {
resamp->setRefCount(0);
std::cout << "DemodulatorPreThread::run() cannot push resamp into iqOutputQueue, is full !" << std::endl;
std::this_thread::yield();
}
//VSO: blocking push
iqOutputQueue->push(resamp);
}
inp->decRefCount();
@ -343,11 +345,12 @@ int DemodulatorPreThread::getAudioSampleRate() {
void DemodulatorPreThread::terminate() {
IOThread::terminate();
DemodulatorThreadIQData *inp = new DemodulatorThreadIQData; // push dummy to nudge queue
if (!iqInputQueue->push(inp)) {
delete inp;
}
//VSO: blocking push :
iqInputQueue->push(inp);
DemodulatorWorkerThreadCommand command(DemodulatorWorkerThreadCommand::DEMOD_WORKER_THREAD_CMD_NULL);
workerQueue->push(command);
workerThread->terminate();

View File

@ -298,23 +298,14 @@ void DemodulatorThread::run() {
ati_vis->type = 0;
}
if (!localAudioVisOutputQueue->push(ati_vis)) {
ati_vis->setRefCount(0);
std::cout << "DemodulatorThread::run() cannot push ati_vis into localAudioVisOutputQueue, is full !" << std::endl;
std::this_thread::yield();
}
//non-blocking push for audio-out
localAudioVisOutputQueue->try_push(ati_vis);
}
if (ati != nullptr) {
if (!muted.load() && (!wxGetApp().getSoloMode() || (demodInstance == wxGetApp().getDemodMgr().getLastActiveDemodulator()))) {
if (!audioOutputQueue->push(ati)) {
ati->decRefCount();
std::cout << "DemodulatorThread::run() cannot push ati into audioOutputQueue, is full !" << std::endl;
std::this_thread::yield();
}
//non-blocking push for audio-out
audioOutputQueue->push(ati);
} else {
ati->setRefCount(0);
}
@ -367,9 +358,9 @@ void DemodulatorThread::run() {
void DemodulatorThread::terminate() {
IOThread::terminate();
DemodulatorThreadPostIQData *inp = new DemodulatorThreadPostIQData; // push dummy to nudge queue
if (!iqInputQueue->push(inp)) {
delete inp;
}
//VSO: blocking push
iqInputQueue->push(inp);
}
bool DemodulatorThread::isMuted() {

View File

@ -10,7 +10,7 @@
#include "AudioThread.h"
#include "Modem.h"
typedef ThreadQueue<AudioThreadInput *> DemodulatorThreadOutputQueue;
typedef ThreadBlockingQueue<AudioThreadInput *> DemodulatorThreadOutputQueue;
#define DEMOD_VIS_SIZE 2048
#define DEMOD_SIGNAL_MIN -30

View File

@ -101,11 +101,10 @@ void DemodulatorWorkerThread::run() {
result.modemType = cModemType;
result.modemName = cModemName;
//VSO: blocking push
resultQueue->push(result);
}
}
// std::cout << "Demodulator worker thread done." << std::endl;
}

View File

@ -8,7 +8,7 @@
#include "liquid/liquid.h"
#include "AudioThread.h"
#include "ThreadQueue.h"
#include "ThreadBlockingQueue.h"
#include "CubicSDRDefs.h"
#include "Modem.h"
@ -69,8 +69,8 @@ public:
ModemSettings settings;
};
typedef ThreadQueue<DemodulatorWorkerThreadCommand> DemodulatorThreadWorkerCommandQueue;
typedef ThreadQueue<DemodulatorWorkerThreadResult> DemodulatorThreadWorkerResultQueue;
typedef ThreadBlockingQueue<DemodulatorWorkerThreadCommand> DemodulatorThreadWorkerCommandQueue;
typedef ThreadBlockingQueue<DemodulatorWorkerThreadResult> DemodulatorThreadWorkerResultQueue;
class DemodulatorWorkerThread : public IOThread {
public:

View File

@ -3,6 +3,7 @@
#include "FFTDataDistributor.h"
#include <algorithm>
#include <ThreadBlockingQueue.h>
FFTDataDistributor::FFTDataDistributor() : outputBuffers("FFTDataDistributorBuffers"), fftSize(DEFAULT_FFT_SIZE), linesPerSecond(DEFAULT_WATERFALL_LPS), lineRateAccum(0.0) {
@ -109,7 +110,8 @@ void FFTDataDistributor::process() {
outp->sampleRate = inputBuffer.sampleRate;
outp->data.assign(inputBuffer.data.begin()+bufferOffset+i,
inputBuffer.data.begin()+bufferOffset+i+ fftSize);
distribute(outp);
//authorize distribute with losses
distribute(outp, NON_BLOCKING_TIMEOUT);
while (lineRateAccum >= 1.0) {
lineRateAccum -= 1.0;

View File

@ -53,7 +53,6 @@ void FFTVisualDataThread::run() {
//this if fed by FFTDataDistributor which has a buffer of FFT_DISTRIBUTOR_BUFFER_IN_SECONDS
//so sleep for << FFT_DISTRIBUTOR_BUFFER_IN_SECONDS not to be overflown
std::this_thread::sleep_for(std::chrono::milliseconds((int)(FFT_DISTRIBUTOR_BUFFER_IN_SECONDS * 1000.0 / 25.0)));
// std::this_thread::yield();
int fftSize = wproc.getDesiredInputSize();
@ -65,7 +64,6 @@ void FFTVisualDataThread::run() {
if (lpsChanged.load()) {
fftDistrib.setLinesPerSecond(linesPerSecond.load());
// pipeIQDataIn->set_max_num_items(linesPerSecond.load());
lpsChanged.store(false);
}

View File

@ -19,7 +19,7 @@ public:
double fft_floor, fft_ceil;
};
typedef ThreadQueue<ScopeRenderData *> ScopeRenderDataQueue;
typedef ThreadBlockingQueue<ScopeRenderData *> ScopeRenderDataQueue;
class ScopeVisualProcessor : public VisualProcessor<AudioThreadInput, ScopeRenderData> {
public:

View File

@ -20,7 +20,7 @@ void SpectrumVisualDataThread::run() {
while(!stopping) {
//this if fed by FFTDataDistributor which has a buffer of FFT_DISTRIBUTOR_BUFFER_IN_SECONDS
//so sleep for << FFT_DISTRIBUTOR_BUFFER_IN_SECONDS not to be overflown
std::this_thread::sleep_for(std::chrono::milliseconds((int)(FFT_DISTRIBUTOR_BUFFER_IN_SECONDS * 1000.0 / 25.0)));
std::this_thread::sleep_for(std::chrono::milliseconds((int)(FFT_DISTRIBUTOR_BUFFER_IN_SECONDS * 1000.0 / 25.0)));
sproc.run();
}

View File

@ -19,7 +19,7 @@ public:
int bandwidth;
};
typedef ThreadQueue<SpectrumVisualData *> SpectrumVisualDataQueue;
typedef ThreadBlockingQueue<SpectrumVisualData *> SpectrumVisualDataQueue;
class SpectrumVisualProcessor : public VisualProcessor<DemodulatorThreadIQData, SpectrumVisualData> {
public:

View File

@ -4,7 +4,7 @@
#pragma once
#include "CubicSDRDefs.h"
#include "ThreadQueue.h"
#include "ThreadBlockingQueue.h"
#include "IOThread.h"
#include <algorithm>
#include <vector>
@ -12,8 +12,8 @@
template<typename InputDataType = ReferenceCounter, typename OutputDataType = ReferenceCounter>
class VisualProcessor {
//
typedef ThreadQueue<InputDataType*> VisualInputQueueType;
typedef ThreadQueue<OutputDataType*> VisualOutputQueueType;
typedef typename ThreadBlockingQueue<InputDataType*> VisualInputQueueType;
typedef typename ThreadBlockingQueue<OutputDataType*> VisualOutputQueueType;
typedef typename std::vector< VisualOutputQueueType *>::iterator outputs_i;
public:
virtual ~VisualProcessor() {
@ -94,7 +94,9 @@ protected:
//To be used by derived classes implementing
//process() : will dispatch 'item' into as many
//available outputs, previously set by attachOutput().
void distribute(OutputDataType *item) {
//* \param[in] timeout The number of microseconds to wait to push an item in each one of the outputs, 0(default) means indefinite wait.
//* \param[in] errorMessage an error message written on std::cout in case pf push timeout.
void distribute(OutputDataType *item, std::uint64_t timeout = BLOCKING_INFINITE_TIMEOUT, const char* errorMessage = "") {
std::lock_guard < std::recursive_mutex > busy_lock(busy_update);
//We will try to distribute 'output' among all 'outputs',
@ -103,11 +105,11 @@ protected:
item->setRefCount((int)outputs.size());
for (outputs_i it = outputs.begin(); it != outputs.end(); it++) {
//if 'output' failed to be given to an outputs_i, dec its ref count accordingly.
if (!(*it)->push(item)) {
//blocking push, with a timeout
if (!(*it)->push(item, timeout, errorMessage)) {
item->decRefCount();
}
}
// Now 'item' refcount matches the times 'item' has been successfully distributed,
//i.e shared among the outputs.
}

View File

@ -108,12 +108,6 @@ void SDRPostThread::updateActiveDemodulators() {
// deactivate if active
if (demod->isActive() && !demod->isFollow() && !demod->isTracking()) {
demod->setActive(false);
// DemodulatorThreadIQData *dummyDataOut = new DemodulatorThreadIQData;
// dummyDataOut->frequency = frequency;
// dummyDataOut->sampleRate = sampleRate;
// if (!demodQueue->push(dummyDataOut)) {
// delete dummyDataOut;
// }
}
// follow if follow mode
@ -237,9 +231,8 @@ void SDRPostThread::run() {
void SDRPostThread::terminate() {
IOThread::terminate();
SDRThreadIQData *dummy = new SDRThreadIQData;
if (!iqDataInQueue->push(dummy)) {
delete dummy;
}
//VSO: blocking push
iqDataInQueue->push(dummy);
}
void SDRPostThread::runSingleCH(SDRThreadIQData *data_in) {
@ -300,34 +293,23 @@ void SDRPostThread::runSingleCH(SDRThreadIQData *data_in) {
iirfilt_crcf_execute_block(dcFilter, &data_in->data[0], dataSize, &demodDataOut->data[0]);
if (doDemodVisOut) {
if (!iqActiveDemodVisualQueue->push(demodDataOut)) {
demodDataOut->decRefCount();
std::cout << "SDRPostThread::runSingleCH() cannot push demodDataOut into iqActiveDemodVisualQueue, is full !" << std::endl;
std::this_thread::yield();
}
//VSO: blocking push
iqActiveDemodVisualQueue->push(demodDataOut);
}
if (doIQDataOut) {
if (!iqDataOutQueue->push(demodDataOut)) {
demodDataOut->decRefCount();
std::cout << "SDRPostThread::runSingleCH() cannot push demodDataOut into iqDataOutQueue, is full !" << std::endl;
std::this_thread::yield();
}
//VSO: blocking push
iqDataOutQueue->push(demodDataOut);
}
if (doVisOut) {
if (!iqVisualQueue->push(demodDataOut)) {
demodDataOut->decRefCount();
std::cout << "SDRPostThread::runSingleCH() cannot push demodDataOut into iqVisualQueue, is full !" << std::endl;
std::this_thread::yield();
}
//VSO: blocking push
iqVisualQueue->push(demodDataOut);
}
for (size_t i = 0; i < nRunDemods; i++) {
if (!runDemods[i]->getIQInputDataPipe()->push(demodDataOut)) {
demodDataOut->decRefCount();
std::this_thread::yield();
}
//VSO: blocking push
runDemods[i]->getIQInputDataPipe()->push(demodDataOut);
}
}
}
@ -365,19 +347,12 @@ void SDRPostThread::runPFBCH(SDRThreadIQData *data_in) {
iqDataOut->sampleRate = data_in->sampleRate;
iqDataOut->data.assign(data_in->data.begin(), data_in->data.begin() + dataSize);
if (!iqDataOutQueue->push(iqDataOut)) {
std::cout << "SDRPostThread::runPFBCH() cannot push iqDataOut into iqDataOutQueue, is full !" << std::endl;
iqDataOut->decRefCount();
std::this_thread::yield();
}
if (doVis) {
if (!iqVisualQueue->push(iqDataOut)) {
std::cout << "SDRPostThread::runPFBCH() cannot push iqDataOut into iqVisualQueue, is full !" << std::endl;
iqDataOut->decRefCount();
std::this_thread::yield();
}
//VSO: blocking push
iqDataOutQueue->push(iqDataOut);
if (doVis) {
//VSO: blocking push
iqVisualQueue->push(iqDataOut);
}
}
@ -473,21 +448,15 @@ void SDRPostThread::runPFBCH(SDRThreadIQData *data_in) {
}
if (doDemodVis) {
if (!iqActiveDemodVisualQueue->push(demodDataOut)) {
std::cout << "SDRPostThread::runPFBCH() cannot push demodDataOut into iqActiveDemodVisualQueue, is full !" << std::endl;
demodDataOut->decRefCount();
std::this_thread::yield();
}
//VSO: blocking push
iqActiveDemodVisualQueue->push(demodDataOut);
}
for (size_t j = 0; j < nRunDemods; j++) {
if (demodChannel[j] == i) {
DemodulatorInstance *demod = runDemods[j];
if (!demod->getIQInputDataPipe()->push(demodDataOut)) {
demodDataOut->decRefCount();
std::this_thread::yield();
}
//VSO: blocking push
demod->getIQInputDataPipe()->push(demodDataOut);
}
}
}

View File

@ -251,7 +251,7 @@ void SDRThread::readStream(SDRThreadIQDataQueue* iqDataOutQueue) {
dataOut->dcCorrected = hasHardwareDC.load();
dataOut->numChannels = numChannels.load();
if (!iqDataOutQueue->push(dataOut)) {
if (!iqDataOutQueue->try_push(dataOut)) {
//The rest of the system saturates,
//finally the push didn't suceeded, recycle dataOut immediatly.
dataOut->setRefCount(0);

View File

@ -5,7 +5,7 @@
#include <atomic>
#include "ThreadQueue.h"
#include "ThreadBlockingQueue.h"
#include "DemodulatorMgr.h"
#include "SDRDeviceInfo.h"
#include "AppConfig.h"
@ -39,7 +39,7 @@ public:
}
};
typedef ThreadQueue<SDRThreadIQData *> SDRThreadIQDataQueue;
typedef ThreadBlockingQueue<SDRThreadIQData *> SDRThreadIQDataQueue;
class SDRThread : public IOThread {
private:

View File

@ -0,0 +1,4 @@
// Copyright (c) Charles J. Cliffe
// SPDX-License-Identifier: GPL-2.0+
#include <ThreadBlockingQueue.h>

View File

@ -0,0 +1,268 @@
// Copyright (c) Charles J. Cliffe
// SPDX-License-Identifier: GPL-2.0+
#pragma once
#include <deque>
#include <mutex>
#include <thread>
#include <cstdint>
#include <stddef.h>
#include <condition_variable>
#include <ThreadQueue.h>
#define MIN_ITEM_NB (1)
//use this timeout constant in either pop() or push() calls to indicate
// a non-blocking operation, so respectively equivalent to try_pop() and try_push()
#define NON_BLOCKING_TIMEOUT (100)
//use this timeout constant in either pop() or push() calls to indicate
//an indefnite timeout duration.
#define BLOCKING_INFINITE_TIMEOUT (0)
/** A thread-safe asynchronous blocking queue */
template<typename T>
class ThreadBlockingQueue : public ThreadQueueBase {
typedef typename std::deque<T>::value_type value_type;
typedef typename std::deque<T>::size_type size_type;
public:
/*! Create safe blocking queue. */
ThreadBlockingQueue() {
//at least 1 (== Exchanger)
m_max_num_items = MIN_ITEM_NB;
};
ThreadBlockingQueue(const ThreadBlockingQueue& sq) {
std::lock_guard < std::mutex > lock(sq.m_mutex);
m_queue = sq.m_queue;
m_max_num_items = sq.m_max_num_items;
}
/*! Destroy safe queue. */
~ThreadBlockingQueue() {
std::lock_guard < std::mutex > lock(m_mutex);
}
/**
* Sets the maximum number of items in the queue. Real value is clamped
* to 1 on the lower bound.
* \param[in] nb max of items
*/
void set_max_num_items(unsigned int max_num_items) {
std::lock_guard < std::mutex > lock(m_mutex);
if (max_num_items > m_max_num_items) {
//Only raise the existing max size, never squash it
//for simplification sake at runtime.
m_max_num_items = max_num_items;
m_cond_not_full.notify_all();
}
}
/**
* Pushes the item into the queue. If the queue is full, waits until room
* is available, for at most timeout microseconds.
* \param[in] item An item.
* \param[in] timeout a max waiting timeout in microseconds for an item to be pushed.
* by default, = 0 means indefinite wait.
* \param[in] errorMessage an error message written on std::cout in case of the timeout wait
* \return true if an item was pushed into the queue, else a timeout has occured.
*/
bool push(const value_type& item, std::uint64_t timeout = BLOCKING_INFINITE_TIMEOUT,const char* errorMessage = "") {
std::unique_lock < std::mutex > lock(m_mutex);
if (timeout == BLOCKING_INFINITE_TIMEOUT) {
m_cond_not_full.wait(lock, [this]() // Lambda funct
{
return m_queue.size() < m_max_num_items;
});
} else if (timeout <= NON_BLOCKING_TIMEOUT && m_queue.size() >= m_max_num_items) {
// if the value is below a threshold, consider it is a try_push()
return false;
}
else if (false == m_cond_not_full.wait_for(lock, std::chrono::microseconds(timeout),
[this]() { return m_queue.size() < m_max_num_items; })) {
std::cout << "WARNING: Thread 0x" << std::hex << std::this_thread::get_id() << std::dec <<
" executing {" << typeid(*this).name() << "}.push() has failed with timeout > " <<
(timeout * 0.001) << " ms, message: " << errorMessage << std::endl;
return false;
}
m_queue.push_back(item);
m_cond_not_empty.notify_all();
return true;
}
/**
* Try to pushes the item into the queue, immediatly, without waiting. If the queue is full, the item
* is not inserted and the function returns false.
* \param[in] item An item.
*/
bool try_push(const value_type& item) {
std::lock_guard < std::mutex > lock(m_mutex);
if (m_queue.size() >= m_max_num_items) {
return false;
}
m_queue.push_back(item);
m_cond_not_empty.notify_all();
return true;
}
/**
* Pops item from the queue. If the queue is empty, blocks for timeout microseconds, or until item becomes available.
* \param[in] timeout The number of microseconds to wait. O (default) means indefinite wait.
* \param[in] errorMessage an error message written on std::cout in case of the timeout wait
* \return true if get an item from the queue, false if no item is received before the timeout.
*/
bool pop(value_type& item, std::uint64_t timeout = BLOCKING_INFINITE_TIMEOUT, const char* errorMessage = "") {
std::unique_lock < std::mutex > lock(m_mutex);
if (timeout == BLOCKING_INFINITE_TIMEOUT) {
m_cond_not_empty.wait(lock, [this]() // Lambda funct
{
return !m_queue.empty();
});
} else if (timeout <= NON_BLOCKING_TIMEOUT && m_queue.empty()) {
// if the value is below a threshold, consider it is try_pop()
return false;
}
else if (false == m_cond_not_empty.wait_for(lock, std::chrono::microseconds(timeout),
[this]() { return !m_queue.empty(); })) {
std::cout << "WARNING: Thread 0x" << std::hex << std::this_thread::get_id() << std::dec <<
" executing {" << typeid(*this).name() << "}.pop() has failed with timeout > " <<
(timeout * 0.001) << " ms, message: " << errorMessage << std::endl;
return false;
}
item = m_queue.front();
m_queue.pop_front();
m_cond_not_full.notify_all();
return true;
}
/**
* Tries to pop item from the queue.
* \param[out] item The item.
* \return False is returned if no item is available.
*/
bool try_pop(value_type& item) {
std::lock_guard < std::mutex > lock(m_mutex);
if (m_queue.empty()) {
return false;
}
item = m_queue.front();
m_queue.pop_front();
m_cond_not_full.notify_all();
return true;
}
/**
* Gets the number of items in the queue.
* \return Number of items in the queue.
*/
size_type size() const {
std::lock_guard < std::mutex > lock(m_mutex);
return m_queue.size();
}
/**
* Check if the queue is empty.
* \return true if queue is empty.
*/
bool empty() const {
std::lock_guard < std::mutex > lock(m_mutex);
return m_queue.empty();
}
/**
* Check if the queue is full.
* \return true if queue is full.
*/
bool full() const {
std::lock_guard < std::mutex > lock(m_mutex);
return (m_queue.size() >= m_max_num_items);
}
/**
* Remove any items in the queue.
*/
void flush() {
std::lock_guard < std::mutex > lock(m_mutex);
m_queue.clear();
m_cond_not_full.notify_all();
}
/**
* Swaps the contents.
* \param[out] sq The ThreadBlockingQueue to swap with 'this'.
*/
void swap(ThreadBlockingQueue& sq) {
if (this != &sq) {
std::lock_guard < std::mutex > lock1(m_mutex);
std::lock_guard < std::mutex > lock2(sq.m_mutex);
m_queue.swap(sq.m_queue);
std::swap(m_max_num_items, sq.m_max_num_items);
if (!m_queue.empty()) {
m_cond_not_empty.notify_all();
}
if (!sq.m_queue.empty()) {
sq.m_cond_not_empty.notify_all();
}
if (!m_queue.full()) {
m_cond_not_full.notify_all();
}
if (!sq.m_queue.full()) {
sq.m_cond_not_full.notify_all();
}
}
}
/*! The copy assignment operator */
ThreadBlockingQueue& operator=(const ThreadBlockingQueue& sq) {
if (this != &sq) {
std::lock_guard < std::mutex > lock1(m_mutex);
std::lock_guard < std::mutex > lock2(sq.m_mutex);
m_queue = sq.m_queue
m_max_num_items = sq.m_max_num_items;
if (!m_queue.empty()) {
m_cond_not_empty.notify_all();
}
if (!m_queue.full()) {
m_cond_not_full.notify_all();
}
}
return *this;
}
private:
//TODO: use a circular buffer structure ? (fixed array + modulo)
std::deque<T> m_queue;
mutable std::mutex m_mutex;
std::condition_variable m_cond_not_empty;
std::condition_variable m_cond_not_full;
size_t m_max_num_items = MIN_ITEM_NB;
};
/*! Swaps the contents of two ThreadBlockingQueue objects. (external operator) */
template<typename T>
void swap(ThreadBlockingQueue<T>& q1, ThreadBlockingQueue<T>& q2) {
q1.swap(q2);
}