From 24d06e13d7a2651a743009958fc9361c507740d8 Mon Sep 17 00:00:00 2001 From: "Charles J. Cliffe" Date: Mon, 22 Dec 2014 21:12:13 -0500 Subject: [PATCH] Fix for various threading crashes during stress test --- src/audio/AudioThread.cpp | 37 ++++++++++--- src/demod/DemodDefs.h | 6 +- src/demod/DemodulatorThread.cpp | 3 +- src/sdr/SDRPostThread.cpp | 98 ++++++++++++++++++++++----------- src/sdr/SDRPostThread.h | 2 + 5 files changed, 101 insertions(+), 45 deletions(-) diff --git a/src/audio/AudioThread.cpp b/src/audio/AudioThread.cpp index d613cb5..1e95c63 100644 --- a/src/audio/AudioThread.cpp +++ b/src/audio/AudioThread.cpp @@ -67,29 +67,43 @@ static int audioCallback(void *outputBuffer, void *inputBuffer, unsigned int nBu if (srcmix->currentInput.channels == 0 || !srcmix->currentInput.data) { if (!srcmix->inputQueue->empty()) { + if (srcmix->currentInput.data) { + delete srcmix->currentInput.data; + } srcmix->inputQueue->pop(srcmix->currentInput); + srcmix->audio_queue_ptr = 0; } - continue; + return 0; } if (srcmix->currentInput.channels == 1) { for (int i = 0; i < nBufferFrames; i++) { - if (srcmix->audio_queue_ptr >= srcmix->currentInput.data.size()) { + if (srcmix->audio_queue_ptr >= srcmix->currentInput.data->size()) { + if (srcmix->currentInput.data) { + delete srcmix->currentInput.data; + } srcmix->inputQueue->pop(srcmix->currentInput); srcmix->audio_queue_ptr = 0; } - float v = srcmix->currentInput.data[srcmix->audio_queue_ptr] * src->gain; - out[i * 2] += v; - out[i * 2 + 1] += v; + if (srcmix->currentInput.data && srcmix->currentInput.data->size()) { + float v = (*srcmix->currentInput.data)[srcmix->audio_queue_ptr] * src->gain; + out[i * 2] += v; + out[i * 2 + 1] += v; + } srcmix->audio_queue_ptr++; } } else { for (int i = 0, iMax = src->currentInput.channels * nBufferFrames; i < iMax; i++) { if (srcmix->audio_queue_ptr >= srcmix->currentInput.data.size()) { + if (srcmix->currentInput.data) { + delete srcmix->currentInput.data; + } srcmix->inputQueue->pop(srcmix->currentInput); srcmix->audio_queue_ptr = 0; } - out[i] = out[i] + srcmix->currentInput.data[srcmix->audio_queue_ptr] * src->gain; + if (srcmix->currentInput.data && srcmix->currentInput.data->size()) { + out[i] = out[i] + (*srcmix->currentInput.data)[srcmix->audio_queue_ptr] * src->gain; + } srcmix->audio_queue_ptr++; } } @@ -116,6 +130,7 @@ static int audioCallback(void *outputBuffer, void *inputBuffer, unsigned int nBu delete src->currentInput.data; } src->inputQueue->pop(src->currentInput); + src->audio_queue_ptr = 0; } return 0; } @@ -132,7 +147,9 @@ static int audioCallback(void *outputBuffer, void *inputBuffer, unsigned int nBu src->inputQueue->pop(src->currentInput); src->audio_queue_ptr = 0; } - out[i * 2] = out[i * 2 + 1] = (*src->currentInput.data)[src->audio_queue_ptr] * src->gain; + if (src->currentInput.data && src->currentInput.data->size()) { + out[i * 2] = out[i * 2 + 1] = (*src->currentInput.data)[src->audio_queue_ptr] * src->gain; + } src->audio_queue_ptr++; } } else { @@ -144,7 +161,9 @@ static int audioCallback(void *outputBuffer, void *inputBuffer, unsigned int nBu src->inputQueue->pop(src->currentInput); src->audio_queue_ptr = 0; } - out[i] = (*src->currentInput.data)[src->audio_queue_ptr] * src->gain; + if (src->currentInput.data && src->currentInput.data->size()) { + out[i] = (*src->currentInput.data)[src->audio_queue_ptr] * src->gain; + } src->audio_queue_ptr++; } } @@ -305,13 +324,13 @@ void AudioThread::setActive(bool state) { #ifdef __APPLE__ AudioThreadInput dummy; if (state && !active) { - deviceController[parameters.deviceId]->bindThread(this); while (!inputQueue->empty()) { // flush queue inputQueue->pop(dummy); if (dummy.data) { delete dummy.data; } } + deviceController[parameters.deviceId]->bindThread(this); } else if (!state && active) { deviceController[parameters.deviceId]->removeThread(this); while (!inputQueue->empty()) { // flush queue diff --git a/src/demod/DemodDefs.h b/src/demod/DemodDefs.h index 22df172..a9129d2 100644 --- a/src/demod/DemodDefs.h +++ b/src/demod/DemodDefs.h @@ -63,7 +63,7 @@ public: std::atomic *refCount; DemodulatorThreadIQData() : - frequency(0), bandwidth(0), data(NULL), refCount(0) { + frequency(0), bandwidth(0), data(NULL), refCount(NULL) { } @@ -80,8 +80,8 @@ public: void cleanup() { if (refCount) { - (*refCount)--; - if ((*refCount) <= 0) { + refCount->store(refCount->load()-1); + if (refCount->load() == 0) { delete data; data = NULL; delete refCount; diff --git a/src/demod/DemodulatorThread.cpp b/src/demod/DemodulatorThread.cpp index 8748a78..b0f059d 100644 --- a/src/demod/DemodulatorThread.cpp +++ b/src/demod/DemodulatorThread.cpp @@ -67,8 +67,6 @@ void DemodulatorThread::threadMain() { unsigned int num_written; msresamp_crcf_execute(resampler, &((*inp.data)[0]), bufSize, resampled_data, &num_written); - delete inp.data; - agc_crcf_execute_block(agc, resampled_data, num_written, agc_data); float audio_resample_ratio = inp.audio_resample_ratio; @@ -138,6 +136,7 @@ void DemodulatorThread::threadMain() { } } + delete inp.data; } if (resampler != NULL) { diff --git a/src/sdr/SDRPostThread.cpp b/src/sdr/SDRPostThread.cpp index 9b465ae..3b74ae3 100644 --- a/src/sdr/SDRPostThread.cpp +++ b/src/sdr/SDRPostThread.cpp @@ -11,7 +11,7 @@ SDRPostThread::~SDRPostThread() { } void SDRPostThread::bindDemodulator(DemodulatorInstance *demod) { - demodulators.push_back(demod); + demodulators_add.push_back(demod); } void SDRPostThread::removeDemodulator(DemodulatorInstance *demod) { @@ -19,13 +19,7 @@ void SDRPostThread::removeDemodulator(DemodulatorInstance *demod) { return; } - std::vector::iterator i; - - i = std::find(demodulators.begin(), demodulators.end(), demod); - - if (i != demodulators.end()) { - demodulators.erase(i); - } + demodulators_remove.push_back(demod); } void SDRPostThread::setIQDataInQueue(SDRThreadIQDataQueue* iqDataQueue) { @@ -88,48 +82,90 @@ void SDRPostThread::threadMain() { iqVisualQueue.load()->push(visualDataOut); } - std::atomic *c = new std::atomic; - c->store(demodulators.size()); - bool demodActive = false; + if (demodulators_add.size()) { + while (!demodulators_add.empty()) { + demodulators.push_back(demodulators_add.back()); + demodulators_add.pop_back(); + } + } + if (demodulators_remove.size()) { + while (!demodulators_remove.empty()) { + DemodulatorInstance *demod = demodulators_remove.back(); + demodulators_remove.pop_back(); + + std::vector::iterator i = std::find(demodulators.begin(), demodulators.end(), demod); + + if (i != demodulators.end()) { + demodulators.erase(i); + } + } + } + + int activeDemods = 0; + bool pushedData = false; + std::atomic *c = new std::atomic; if (demodulators.size()) { - DemodulatorThreadIQData dummyDataOut; - dummyDataOut.frequency = data_in.frequency; - dummyDataOut.bandwidth = data_in.bandwidth; - DemodulatorThreadIQData demodDataOut; - demodDataOut.frequency = data_in.frequency; - demodDataOut.bandwidth = data_in.bandwidth; - demodDataOut.setRefCount(c); - demodDataOut.data = data_in.data; std::vector::iterator i; for (i = demodulators.begin(); i != demodulators.end(); i++) { DemodulatorInstance *demod = *i; - DemodulatorThreadInputQueue *demodQueue = demod->threadQueueDemod; if (demod->getParams().frequency != data_in.frequency - && abs(data_in.frequency - demod->getParams().frequency) > (int) ((float) ((float) SRATE / 2.0) * 1.15)) { - if (demod->isActive()) { - demod->setActive(false); - demodQueue->push(dummyDataOut); - c->store(c->load() - 1); + && abs(data_in.frequency - demod->getParams().frequency) > (int) ((float) ((float) SRATE / 2.0))) { + continue; + } + activeDemods++; + } + + c->store(activeDemods); + + bool demodActive = false; + + if (demodulators.size()) { + DemodulatorThreadIQData dummyDataOut; + dummyDataOut.frequency = data_in.frequency; + dummyDataOut.bandwidth = data_in.bandwidth; + dummyDataOut.data = NULL; + DemodulatorThreadIQData demodDataOut; + demodDataOut.frequency = data_in.frequency; + demodDataOut.bandwidth = data_in.bandwidth; + demodDataOut.setRefCount(c); + demodDataOut.data = data_in.data; + + std::vector::iterator i; + for (i = demodulators.begin(); i != demodulators.end(); i++) { + DemodulatorInstance *demod = *i; + DemodulatorThreadInputQueue *demodQueue = demod->threadQueueDemod; + + if (demod->getParams().frequency != data_in.frequency + && abs(data_in.frequency - demod->getParams().frequency) > (int) ((float) ((float) SRATE / 2.0))) { + if (demod->isActive()) { + demod->setActive(false); + demodQueue->push(dummyDataOut); + } + } else if (!demod->isActive()) { + demod->setActive(true); + } + + if (!demod->isActive()) { continue; } - } else if (!demod->isActive()) { - demod->setActive(true); - } - demodQueue->push(demodDataOut); - demodActive = true; + demodQueue->push(demodDataOut); + pushedData = true; + } } } - if (!demodActive) { + if (!pushedData) { delete dataOut.data; delete c; } } + + } std::cout << "SDR post-processing thread done." << std::endl; } diff --git a/src/sdr/SDRPostThread.h b/src/sdr/SDRPostThread.h index bd71b76..928cf07 100644 --- a/src/sdr/SDRPostThread.h +++ b/src/sdr/SDRPostThread.h @@ -26,6 +26,8 @@ protected: std::atomic iqVisualQueue; std::vector demodulators; + std::vector demodulators_add; + std::vector demodulators_remove; std::atomic terminated; iirfilt_crcf dcFilter; };