Merge branch 'vsonnier-audioVisOutputQueue_nullptr_crash_and_misc_changes'

This commit is contained in:
Charles J. Cliffe 2016-06-01 19:47:06 -04:00
commit 9a16ed7adf
13 changed files with 83 additions and 51 deletions

View File

@ -50,7 +50,7 @@ void IOThread::setInputQueue(std::string qname, ThreadQueueBase *threadQueue) {
this->onBindInput(qname, threadQueue); this->onBindInput(qname, threadQueue);
}; };
void *IOThread::getInputQueue(std::string qname) { ThreadQueueBase *IOThread::getInputQueue(std::string qname) {
return input_queues[qname]; return input_queues[qname];
}; };
@ -59,7 +59,7 @@ void IOThread::setOutputQueue(std::string qname, ThreadQueueBase *threadQueue) {
this->onBindOutput(qname, threadQueue); this->onBindOutput(qname, threadQueue);
}; };
void *IOThread::getOutputQueue(std::string qname) { ThreadQueueBase *IOThread::getOutputQueue(std::string qname) {
return output_queues[qname]; return output_queues[qname];
}; };

View File

@ -20,22 +20,29 @@ struct map_string_less : public std::binary_function<std::string,std::string,boo
class ReferenceCounter { class ReferenceCounter {
public: public:
mutable std::mutex m_mutex;
void setRefCount(int rc) { void setRefCount(int rc) {
refCount.store(rc); std::lock_guard < std::recursive_mutex > lock(m_mutex);
refCount = rc;
} }
void decRefCount() { void decRefCount() {
refCount.store(refCount.load()-1); std::lock_guard < std::recursive_mutex > lock(m_mutex);
refCount--;
} }
int getRefCount() { int getRefCount() {
return refCount.load(); std::lock_guard < std::recursive_mutex > lock(m_mutex);
return refCount;
} }
protected: protected:
std::atomic_int refCount; //this is a basic mutex for all ReferenceCounter derivatives operations INCLUDING the counter itself for consistency !
mutable std::recursive_mutex m_mutex;
private:
int refCount;
}; };
@ -50,17 +57,19 @@ public:
} }
BufferType *getBuffer() { BufferType *getBuffer() {
BufferType* buf = NULL; std::lock_guard < std::mutex > lock(m_mutex);
BufferType* buf = nullptr;
for (outputBuffersI = outputBuffers.begin(); outputBuffersI != outputBuffers.end(); outputBuffersI++) { for (outputBuffersI = outputBuffers.begin(); outputBuffersI != outputBuffers.end(); outputBuffersI++) {
if (!buf && (*outputBuffersI)->getRefCount() <= 0) { if (buf == nullptr && (*outputBuffersI)->getRefCount() <= 0) {
buf = (*outputBuffersI); buf = (*outputBuffersI);
(*outputBuffersI)->setRefCount(0); buf->setRefCount(0);
} else if ((*outputBuffersI)->getRefCount() <= 0) { } else if ((*outputBuffersI)->getRefCount() <= 0) {
(*outputBuffersI)->decRefCount(); (*outputBuffersI)->decRefCount();
} }
} }
if (buf) { if (buf != nullptr) {
if (outputBuffers.back()->getRefCount() < -REBUFFER_GC_LIMIT) { if (outputBuffers.back()->getRefCount() < -REBUFFER_GC_LIMIT) {
BufferType *ref = outputBuffers.back(); BufferType *ref = outputBuffers.back();
outputBuffers.pop_back(); outputBuffers.pop_back();
@ -81,6 +90,7 @@ public:
} }
void purge() { void purge() {
std::lock_guard < std::mutex > lock(m_mutex);
while (!outputBuffers.empty()) { while (!outputBuffers.empty()) {
BufferType *ref = outputBuffers.front(); BufferType *ref = outputBuffers.front();
outputBuffers.pop_front(); outputBuffers.pop_front();
@ -91,6 +101,7 @@ private:
std::string bufferId; std::string bufferId;
std::deque<BufferType*> outputBuffers; std::deque<BufferType*> outputBuffers;
typename std::deque<BufferType*>::iterator outputBuffersI; typename std::deque<BufferType*>::iterator outputBuffersI;
mutable std::mutex m_mutex;
}; };
@ -115,9 +126,9 @@ public:
virtual void onBindInput(std::string name, ThreadQueueBase* threadQueue); virtual void onBindInput(std::string name, ThreadQueueBase* threadQueue);
void setInputQueue(std::string qname, ThreadQueueBase *threadQueue); void setInputQueue(std::string qname, ThreadQueueBase *threadQueue);
void *getInputQueue(std::string qname); ThreadQueueBase *getInputQueue(std::string qname);
void setOutputQueue(std::string qname, ThreadQueueBase *threadQueue); void setOutputQueue(std::string qname, ThreadQueueBase *threadQueue);
void *getOutputQueue(std::string qname); ThreadQueueBase *getOutputQueue(std::string qname);
protected: protected:
std::map<std::string, ThreadQueueBase *, map_string_less> input_queues; std::map<std::string, ThreadQueueBase *, map_string_less> input_queues;

View File

@ -379,8 +379,8 @@ void AudioThread::run() {
std::cout << "Audio thread started." << std::endl; std::cout << "Audio thread started." << std::endl;
inputQueue = (AudioThreadInputQueue *)getInputQueue("AudioDataInput"); inputQueue = static_cast<AudioThreadInputQueue *>(getInputQueue("AudioDataInput"));
threadQueueNotify = (DemodulatorThreadCommandQueue*)getOutputQueue("NotifyQueue"); threadQueueNotify = static_cast<DemodulatorThreadCommandQueue*>(getOutputQueue("NotifyQueue"));
while (!terminated) { while (!terminated) {
AudioThreadCommand command; AudioThreadCommand command;

View File

@ -28,7 +28,7 @@ public:
} }
~AudioThreadInput() { ~AudioThreadInput() {
std::lock_guard < std::mutex > lock(m_mutex); std::lock_guard < std::recursive_mutex > lock(m_mutex);
} }
}; };

View File

@ -90,7 +90,7 @@ public:
} }
~DemodulatorThreadPostIQData() { ~DemodulatorThreadPostIQData() {
std::lock_guard < std::mutex > lock(m_mutex); std::lock_guard < std::recursive_mutex > lock(m_mutex);
} }
}; };

View File

@ -56,9 +56,9 @@ void DemodulatorPreThread::run() {
ReBuffer<DemodulatorThreadPostIQData> buffers("DemodulatorPreThreadBuffers"); ReBuffer<DemodulatorThreadPostIQData> buffers("DemodulatorPreThreadBuffers");
iqInputQueue = (DemodulatorThreadInputQueue*)getInputQueue("IQDataInput"); iqInputQueue = static_cast<DemodulatorThreadInputQueue*>(getInputQueue("IQDataInput"));
iqOutputQueue = (DemodulatorThreadPostInputQueue*)getOutputQueue("IQDataOutput"); iqOutputQueue = static_cast<DemodulatorThreadPostInputQueue*>(getOutputQueue("IQDataOutput"));
threadQueueNotify = (DemodulatorThreadCommandQueue*)getOutputQueue("NotifyQueue"); threadQueueNotify = static_cast<DemodulatorThreadCommandQueue*>(getOutputQueue("NotifyQueue"));
std::vector<liquid_float_complex> in_buf_data; std::vector<liquid_float_complex> in_buf_data;
std::vector<liquid_float_complex> out_buf_data; std::vector<liquid_float_complex> out_buf_data;

View File

@ -13,7 +13,9 @@
#include <pthread.h> #include <pthread.h>
#endif #endif
DemodulatorThread::DemodulatorThread(DemodulatorInstance *parent) : IOThread(), outputBuffers("DemodulatorThreadBuffers"), squelchLevel(-100), signalLevel(-100), squelchEnabled(false), cModem(nullptr), cModemKit(nullptr), iqInputQueue(NULL), audioOutputQueue(NULL), audioVisOutputQueue(NULL), threadQueueControl(NULL), threadQueueNotify(NULL) { DemodulatorThread::DemodulatorThread(DemodulatorInstance *parent)
: IOThread(), outputBuffers("DemodulatorThreadBuffers"), squelchLevel(-100),
signalLevel(-100), squelchEnabled(false) {
demodInstance = parent; demodInstance = parent;
muted.store(false); muted.store(false);
@ -26,7 +28,11 @@ DemodulatorThread::~DemodulatorThread() {
void DemodulatorThread::onBindOutput(std::string name, ThreadQueueBase *threadQueue) { void DemodulatorThread::onBindOutput(std::string name, ThreadQueueBase *threadQueue) {
if (name == "AudioVisualOutput") { if (name == "AudioVisualOutput") {
audioVisOutputQueue = (DemodulatorThreadOutputQueue *)threadQueue;
//protects because it may be changed at runtime
std::lock_guard < std::mutex > lock(m_mutexAudioVisOutputQueue);
audioVisOutputQueue = static_cast<DemodulatorThreadOutputQueue*>(threadQueue);
} }
} }
@ -63,10 +69,10 @@ void DemodulatorThread::run() {
std::cout << "Demodulator thread started.." << std::endl; std::cout << "Demodulator thread started.." << std::endl;
iqInputQueue = (DemodulatorThreadPostInputQueue*)getInputQueue("IQDataInput"); iqInputQueue = static_cast<DemodulatorThreadPostInputQueue*>(getInputQueue("IQDataInput"));
audioOutputQueue = (AudioThreadInputQueue*)getOutputQueue("AudioDataOutput"); audioOutputQueue = static_cast<AudioThreadInputQueue*>(getOutputQueue("AudioDataOutput"));
threadQueueControl = (DemodulatorThreadControlCommandQueue *)getInputQueue("ControlQueue"); threadQueueControl = static_cast<DemodulatorThreadControlCommandQueue *>(getInputQueue("ControlQueue"));
threadQueueNotify = (DemodulatorThreadCommandQueue*)getOutputQueue("NotifyQueue"); threadQueueNotify = static_cast<DemodulatorThreadCommandQueue*>(getOutputQueue("NotifyQueue"));
ModemIQData modemData; ModemIQData modemData;
@ -119,7 +125,7 @@ void DemodulatorThread::run() {
modemData.data.assign(inputData->begin(), inputData->end()); modemData.data.assign(inputData->begin(), inputData->end());
modemData.setRefCount(1); modemData.setRefCount(1);
AudioThreadInput *ati = NULL; AudioThreadInput *ati = nullptr;
ModemAnalog *modemAnalog = (cModem->getType() == "analog")?((ModemAnalog *)cModem):nullptr; ModemAnalog *modemAnalog = (cModem->getType() == "analog")?((ModemAnalog *)cModem):nullptr;
ModemDigital *modemDigital = (cModem->getType() == "digital")?((ModemDigital *)cModem):nullptr; ModemDigital *modemDigital = (cModem->getType() == "digital")?((ModemDigital *)cModem):nullptr;
@ -159,7 +165,7 @@ void DemodulatorThread::run() {
} }
} }
if (audioOutputQueue != NULL && ati && !squelched) { if (audioOutputQueue != nullptr && ati && !squelched) {
std::vector<float>::iterator data_i; std::vector<float>::iterator data_i;
ati->peak = 0; ati->peak = 0;
for (data_i = ati->data.begin(); data_i != ati->data.end(); data_i++) { for (data_i = ati->data.begin(); data_i != ati->data.end(); data_i++) {
@ -173,8 +179,17 @@ void DemodulatorThread::run() {
ati = nullptr; ati = nullptr;
} }
if (ati && audioVisOutputQueue != NULL && audioVisOutputQueue->empty()) { //At that point, capture the current state of audioVisOutputQueue in a local
//variable, and works with it with now on until the next while-turn.
DemodulatorThreadOutputQueue* localAudioVisOutputQueue = nullptr;
{
std::lock_guard < std::mutex > lock(m_mutexAudioVisOutputQueue);
localAudioVisOutputQueue = audioVisOutputQueue;
}
if (ati && localAudioVisOutputQueue != nullptr && localAudioVisOutputQueue->empty()) {
AudioThreadInput *ati_vis = audioVisBuffers.getBuffer(); AudioThreadInput *ati_vis = audioVisBuffers.getBuffer();
ati_vis->setRefCount(1); ati_vis->setRefCount(1);
ati_vis->sampleRate = inp->sampleRate; ati_vis->sampleRate = inp->sampleRate;
ati_vis->inputRate = inp->sampleRate; ati_vis->inputRate = inp->sampleRate;
@ -230,11 +245,11 @@ void DemodulatorThread::run() {
ati_vis->type = 0; ati_vis->type = 0;
} }
audioVisOutputQueue->push(ati_vis); localAudioVisOutputQueue->push(ati_vis);
} }
if (ati != NULL) { if (ati != nullptr) {
if (!muted.load() && (!wxGetApp().getSoloMode() || (demodInstance == wxGetApp().getDemodMgr().getLastActiveDemodulator()))) { if (!muted.load() && (!wxGetApp().getSoloMode() || (demodInstance == wxGetApp().getDemodMgr().getLastActiveDemodulator()))) {
audioOutputQueue->push(ati); audioOutputQueue->push(ati);
} else { } else {
@ -266,7 +281,10 @@ void DemodulatorThread::run() {
outputBuffers.purge(); outputBuffers.purge();
if (audioVisOutputQueue && !audioVisOutputQueue->empty()) { //Guard the cleanup of audioVisOutputQueue properly.
std::lock_guard < std::mutex > lock(m_mutexAudioVisOutputQueue);
if (audioVisOutputQueue != nullptr && !audioVisOutputQueue->empty()) {
AudioThreadInput *dummy_vis; AudioThreadInput *dummy_vis;
audioVisOutputQueue->pop(dummy_vis); audioVisOutputQueue->pop(dummy_vis);
} }

View File

@ -40,8 +40,8 @@ protected:
float abMagnitude(double alpha, double beta, float inphase, float quadrature); float abMagnitude(double alpha, double beta, float inphase, float quadrature);
float linearToDb(float linear); float linearToDb(float linear);
DemodulatorInstance *demodInstance; DemodulatorInstance *demodInstance = nullptr;
ReBuffer<AudioThreadInput> outputBuffers; ReBuffer<AudioThreadInput> outputBuffers = nullptr;
std::atomic_bool muted; std::atomic_bool muted;
@ -49,12 +49,15 @@ protected:
std::atomic<float> signalLevel; std::atomic<float> signalLevel;
bool squelchEnabled, squelchBreak; bool squelchEnabled, squelchBreak;
Modem *cModem; Modem *cModem = nullptr;
ModemKit *cModemKit; ModemKit *cModemKit = nullptr;
DemodulatorThreadPostInputQueue* iqInputQueue; DemodulatorThreadPostInputQueue* iqInputQueue = nullptr;
AudioThreadInputQueue *audioOutputQueue; AudioThreadInputQueue *audioOutputQueue = nullptr;
DemodulatorThreadOutputQueue* audioVisOutputQueue; DemodulatorThreadOutputQueue* audioVisOutputQueue = nullptr;
DemodulatorThreadControlCommandQueue *threadQueueControl; DemodulatorThreadControlCommandQueue *threadQueueControl = nullptr;
DemodulatorThreadCommandQueue* threadQueueNotify; DemodulatorThreadCommandQueue* threadQueueNotify = nullptr;
//protects the audioVisOutputQueue dynamic binding change at runtime (in DemodulatorMgr)
mutable std::mutex m_mutexAudioVisOutputQueue;
}; };

View File

@ -14,8 +14,8 @@ void DemodulatorWorkerThread::run() {
std::cout << "Demodulator worker thread started.." << std::endl; std::cout << "Demodulator worker thread started.." << std::endl;
commandQueue = (DemodulatorThreadWorkerCommandQueue *)getInputQueue("WorkerCommandQueue"); commandQueue = static_cast<DemodulatorThreadWorkerCommandQueue *>(getInputQueue("WorkerCommandQueue"));
resultQueue = (DemodulatorThreadWorkerResultQueue *)getOutputQueue("WorkerResultQueue"); resultQueue = static_cast<DemodulatorThreadWorkerResultQueue *>(getOutputQueue("WorkerResultQueue"));
while (!terminated) { while (!terminated) {
bool filterChanged = false; bool filterChanged = false;

View File

@ -32,7 +32,7 @@ public:
} }
~ModemIQData() { ~ModemIQData() {
std::lock_guard < std::mutex > lock(m_mutex); std::lock_guard < std::recursive_mutex > lock(m_mutex);
} }
}; };

View File

@ -24,8 +24,8 @@ SpectrumVisualProcessor *FFTVisualDataThread::getProcessor() {
} }
void FFTVisualDataThread::run() { void FFTVisualDataThread::run() {
DemodulatorThreadInputQueue *pipeIQDataIn = (DemodulatorThreadInputQueue *)getInputQueue("IQDataInput"); DemodulatorThreadInputQueue *pipeIQDataIn = static_cast<DemodulatorThreadInputQueue *>(getInputQueue("IQDataInput"));
SpectrumVisualDataQueue *pipeFFTDataOut = (SpectrumVisualDataQueue *)getOutputQueue("FFTDataOutput"); SpectrumVisualDataQueue *pipeFFTDataOut = static_cast<SpectrumVisualDataQueue *>(getOutputQueue("FFTDataOutput"));
fftQueue.set_max_num_items(100); fftQueue.set_max_num_items(100);
pipeFFTDataOut->set_max_num_items(100); pipeFFTDataOut->set_max_num_items(100);

View File

@ -161,10 +161,10 @@ void SDRPostThread::run() {
std::cout << "SDR post-processing thread started.." << std::endl; std::cout << "SDR post-processing thread started.." << std::endl;
iqDataInQueue = (SDRThreadIQDataQueue*)getInputQueue("IQDataInput"); iqDataInQueue = static_cast<SDRThreadIQDataQueue*>(getInputQueue("IQDataInput"));
iqDataOutQueue = (DemodulatorThreadInputQueue*)getOutputQueue("IQDataOutput"); iqDataOutQueue = static_cast<DemodulatorThreadInputQueue*>(getOutputQueue("IQDataOutput"));
iqVisualQueue = (DemodulatorThreadInputQueue*)getOutputQueue("IQVisualDataOutput"); iqVisualQueue = static_cast<DemodulatorThreadInputQueue*>(getOutputQueue("IQVisualDataOutput"));
iqActiveDemodVisualQueue = (DemodulatorThreadInputQueue*)getOutputQueue("IQActiveDemodVisualDataOutput"); iqActiveDemodVisualQueue = static_cast<DemodulatorThreadInputQueue*>(getOutputQueue("IQActiveDemodVisualDataOutput"));
iqDataInQueue->set_max_num_items(0); iqDataInQueue->set_max_num_items(0);

View File

@ -216,7 +216,7 @@ void SDRThread::readStream(SDRThreadIQDataQueue* iqDataOutQueue) {
} }
void SDRThread::readLoop() { void SDRThread::readLoop() {
SDRThreadIQDataQueue* iqDataOutQueue = (SDRThreadIQDataQueue*) getOutputQueue("IQDataOutput"); SDRThreadIQDataQueue* iqDataOutQueue = static_cast<SDRThreadIQDataQueue*>( getOutputQueue("IQDataOutput"));
if (iqDataOutQueue == NULL) { if (iqDataOutQueue == NULL) {
return; return;