diff --git a/src/AppFrame.cpp b/src/AppFrame.cpp index 6b10723..f44105f 100644 --- a/src/AppFrame.cpp +++ b/src/AppFrame.cpp @@ -806,11 +806,11 @@ void AppFrame::OnMenu(wxCommandEvent& event) { #endif else if (event.GetId() == wxID_SDR_START_STOP) { if (!wxGetApp().getSDRThread()->isTerminated()) { - wxGetApp().stopDevice(true); + wxGetApp().stopDevice(true, 2000); } else { SDRDeviceInfo *dev = wxGetApp().getDevice(); if (dev != nullptr) { - wxGetApp().setDevice(dev); + wxGetApp().setDevice(dev, 0); } } } else if (event.GetId() == wxID_LOW_PERF) { diff --git a/src/CubicSDR.cpp b/src/CubicSDR.cpp index c926d8f..e30b1bc 100644 --- a/src/CubicSDR.cpp +++ b/src/CubicSDR.cpp @@ -133,8 +133,8 @@ long long strToFrequency(std::string freqStr) { } -CubicSDR::CubicSDR() : appframe(NULL), m_glContext(NULL), frequency(0), offset(0), ppm(0), snap(1), sampleRate(DEFAULT_SAMPLE_RATE), - sdrThread(NULL), sdrPostThread(NULL), spectrumVisualThread(NULL), demodVisualThread(NULL), pipeSDRIQData(NULL), pipeIQVisualData(NULL), pipeAudioVisualData(NULL), t_SDR(NULL), t_PostSDR(NULL) { +CubicSDR::CubicSDR() : frequency(0), offset(0), ppm(0), snap(1), sampleRate(DEFAULT_SAMPLE_RATE),agcMode(false) + { sampleRateInitialized.store(false); agcMode.store(true); soloMode.store(false); @@ -289,22 +289,24 @@ int CubicSDR::OnExit() { stopRig(); } #endif - - demodMgr.terminateAll(); - + + //The thread feeding them all should be terminated first, so: std::cout << "Terminating SDR thread.." << std::endl; sdrThread->terminate(); - sdrThread->isTerminated(1000); + sdrThread->isTerminated(3000); if (t_SDR) { t_SDR->join(); delete t_SDR; t_SDR = nullptr; } - + std::cout << "Terminating SDR post-processing thread.." << std::endl; sdrPostThread->terminate(); - + + std::cout << "Terminating All Demodulators.." << std::endl; + demodMgr.terminateAll(); + std::cout << "Terminating Visual Processor threads.." << std::endl; spectrumVisualThread->terminate(); demodVisualThread->terminate(); @@ -542,16 +544,11 @@ void CubicSDR::setSampleRate(long long rate_in) { } } -void CubicSDR::stopDevice(bool store) { - if (store) { - stoppedDev = sdrThread->getDevice(); - } else { - stoppedDev = nullptr; - } - sdrThread->setDevice(nullptr); - +void CubicSDR::stopDevice(bool store, int waitMsForTermination) { + + //Firt we must stop the threads sdrThread->terminate(); - sdrThread->isTerminated(1000); + sdrThread->isTerminated(waitMsForTermination); if (t_SDR) { t_SDR->join(); @@ -559,6 +556,15 @@ void CubicSDR::stopDevice(bool store) { t_SDR = nullptr; } + //Only now we can nullify devices + if (store) { + stoppedDev = sdrThread->getDevice(); + } + else { + stoppedDev = nullptr; + } + + sdrThread->setDevice(nullptr); } void CubicSDR::reEnumerateDevices() { @@ -568,10 +574,10 @@ void CubicSDR::reEnumerateDevices() { t_SDREnum = new std::thread(&SDREnumerator::threadMain, sdrEnum); } -void CubicSDR::setDevice(SDRDeviceInfo *dev) { +void CubicSDR::setDevice(SDRDeviceInfo *dev, int waitMsForTermination) { sdrThread->terminate(); - sdrThread->isTerminated(1000); + sdrThread->isTerminated(waitMsForTermination); if (t_SDR) { t_SDR->join(); diff --git a/src/CubicSDR.h b/src/CubicSDR.h index 767ae4f..07b28cb 100644 --- a/src/CubicSDR.h +++ b/src/CubicSDR.h @@ -98,8 +98,8 @@ public: long long getSampleRate(); std::vector *getDevices(); - void setDevice(SDRDeviceInfo *dev); - void stopDevice(bool store); + void setDevice(SDRDeviceInfo *dev, int waitMsForTermination); + void stopDevice(bool store, int waitMsForTermination); SDRDeviceInfo * getDevice(); ScopeVisualProcessor *getScopeProcessor(); @@ -173,10 +173,10 @@ public: private: int FilterEvent(wxEvent& event); - AppFrame *appframe; + AppFrame *appframe = nullptr; AppConfig config; - PrimaryGLContext *m_glContext; - std::vector *devs; + PrimaryGLContext *m_glContext = nullptr; + std::vector *devs = nullptr; DemodulatorMgr demodMgr; @@ -186,27 +186,31 @@ private: std::atomic_llong sampleRate; std::atomic_bool agcMode; - SDRThread *sdrThread; - SDREnumerator *sdrEnum; - SDRPostThread *sdrPostThread; - SpectrumVisualDataThread *spectrumVisualThread; - SpectrumVisualDataThread *demodVisualThread; + SDRThread *sdrThread = nullptr; + SDREnumerator *sdrEnum = nullptr; + SDRPostThread *sdrPostThread = nullptr; + SpectrumVisualDataThread *spectrumVisualThread = nullptr; + SpectrumVisualDataThread *demodVisualThread = nullptr; - SDRThreadIQDataQueue* pipeSDRIQData; - DemodulatorThreadInputQueue* pipeIQVisualData; - DemodulatorThreadOutputQueue* pipeAudioVisualData; - DemodulatorThreadInputQueue* pipeDemodIQVisualData; - DemodulatorThreadInputQueue* pipeWaterfallIQVisualData; - DemodulatorThreadInputQueue* pipeActiveDemodIQVisualData; + SDRThreadIQDataQueue* pipeSDRIQData = nullptr; + DemodulatorThreadInputQueue* pipeIQVisualData = nullptr; + DemodulatorThreadOutputQueue* pipeAudioVisualData = nullptr; + DemodulatorThreadInputQueue* pipeDemodIQVisualData = nullptr; + DemodulatorThreadInputQueue* pipeWaterfallIQVisualData = nullptr; + DemodulatorThreadInputQueue* pipeActiveDemodIQVisualData = nullptr; ScopeVisualProcessor scopeProcessor; - SDRDevicesDialog *deviceSelectorDialog; + SDRDevicesDialog *deviceSelectorDialog = nullptr; SoapySDR::Kwargs streamArgs; SoapySDR::Kwargs settingArgs; - std::thread *t_SDR, *t_SDREnum, *t_PostSDR, *t_SpectrumVisual, *t_DemodVisual; + std::thread *t_SDR = nullptr; + std::thread *t_SDREnum = nullptr; + std::thread *t_PostSDR = nullptr; + std::thread *t_SpectrumVisual = nullptr; + std::thread *t_DemodVisual = nullptr; std::atomic_bool devicesReady; std::atomic_bool devicesFailed; std::atomic_bool deviceSelectorOpen; @@ -224,8 +228,8 @@ private: std::atomic_bool soloMode; SDRDeviceInfo *stoppedDev; #ifdef USE_HAMLIB - RigThread* rigThread; - std::thread *t_Rig; + RigThread* rigThread = nullptr; + std::thread *t_Rig = nullptr; #endif }; diff --git a/src/audio/AudioThread.cpp b/src/audio/AudioThread.cpp index 3910be5..cd49d1b 100644 --- a/src/audio/AudioThread.cpp +++ b/src/audio/AudioThread.cpp @@ -6,13 +6,14 @@ #include "DemodulatorThread.h" #include "DemodulatorInstance.h" #include +#include std::map AudioThread::deviceController; std::map AudioThread::deviceSampleRate; std::map AudioThread::deviceThread; AudioThread::AudioThread() : IOThread(), - currentInput(NULL), inputQueue(NULL), nBufferFrames(1024), sampleRate(0) { + currentInput(nullptr), inputQueue(nullptr), nBufferFrames(1024), sampleRate(0) { audioQueuePtr.store(0); underflowCount.store(0); @@ -29,13 +30,24 @@ AudioThread::~AudioThread() { delete vBoundThreads; } +std::recursive_mutex & AudioThread::getMutex() +{ + return m_mutex; +} + void AudioThread::bindThread(AudioThread *other) { + + std::lock_guard lock(m_mutex); + if (std::find(boundThreads.load()->begin(), boundThreads.load()->end(), other) == boundThreads.load()->end()) { boundThreads.load()->push_back(other); } } void AudioThread::removeThread(AudioThread *other) { + + std::lock_guard lock(m_mutex); + std::vector::iterator i; i = std::find(boundThreads.load()->begin(), boundThreads.load()->end(), other); if (i != boundThreads.load()->end()) { @@ -44,6 +56,7 @@ void AudioThread::removeThread(AudioThread *other) { } void AudioThread::deviceCleanup() { + std::map::iterator i; for (i = deviceController.begin(); i != deviceController.end(); i++) { @@ -53,53 +66,67 @@ void AudioThread::deviceCleanup() { static int audioCallback(void *outputBuffer, void * /* inputBuffer */, unsigned int nBufferFrames, double /* streamTime */, RtAudioStreamStatus status, void *userData) { - AudioThread *src = (AudioThread *) userData; - float *out = (float*) outputBuffer; + + float *out = (float*)outputBuffer; + + //Zero output buffer in all cases: this allow to mute audio if no AudioThread data is + //actually active. memset(out, 0, nBufferFrames * 2 * sizeof(float)); + AudioThread *src = (AudioThread *) userData; + + std::lock_guard lock(src->getMutex()); + if (src->isTerminated()) { return 1; } if (status) { - std::cout << "Audio buffer underflow.." << (src->underflowCount++) << std::endl; + std::cout << "Audio buffer underflow.." << (src->underflowCount++) << std::endl; } if (src->boundThreads.load()->empty()) { - return 0; + return 0; } - float peak = 0.0; - + + double peak = 0.0; + + //for all boundThreads for (size_t j = 0; j < src->boundThreads.load()->size(); j++) { + AudioThread *srcmix = (*(src->boundThreads.load()))[j]; + + //lock every single boundThread srcmix in succession the time we process + //its audio samples. + std::lock_guard lock(srcmix->getMutex()); + if (srcmix->isTerminated() || !srcmix->inputQueue || srcmix->inputQueue->empty() || !srcmix->isActive()) { continue; } if (!srcmix->currentInput) { srcmix->audioQueuePtr = 0; - if (srcmix->isTerminated() || srcmix->inputQueue->empty()) { - continue; - } - srcmix->inputQueue->pop(srcmix->currentInput); - if (srcmix->isTerminated()) { + + if (!srcmix->inputQueue->try_pop(srcmix->currentInput)) { continue; } + continue; } if (srcmix->currentInput->sampleRate != src->getSampleRate()) { - while (srcmix->inputQueue->size()) { - srcmix->inputQueue->pop(srcmix->currentInput); + + while (srcmix->inputQueue->try_pop(srcmix->currentInput)) { + if (srcmix->currentInput) { if (srcmix->currentInput->sampleRate == src->getSampleRate()) { break; } srcmix->currentInput->decRefCount(); } - srcmix->currentInput = NULL; - } + srcmix->currentInput = nullptr; + } //end while srcmix->audioQueuePtr = 0; @@ -114,37 +141,35 @@ static int audioCallback(void *outputBuffer, void * /* inputBuffer */, unsigned srcmix->audioQueuePtr = 0; if (srcmix->currentInput) { srcmix->currentInput->decRefCount(); - srcmix->currentInput = NULL; + srcmix->currentInput = nullptr; } - if (srcmix->isTerminated() || srcmix->inputQueue->empty()) { + + if (!srcmix->inputQueue->try_pop(srcmix->currentInput)) { continue; - } - srcmix->inputQueue->pop(srcmix->currentInput); - if (srcmix->isTerminated()) { - continue; - } + } } continue; } - float mixPeak = srcmix->currentInput->peak * srcmix->gain; + double mixPeak = srcmix->currentInput->peak * srcmix->gain; if (srcmix->currentInput->channels == 1) { + for (unsigned int i = 0; i < nBufferFrames; i++) { + if (srcmix->audioQueuePtr >= srcmix->currentInput->data.size()) { srcmix->audioQueuePtr = 0; if (srcmix->currentInput) { srcmix->currentInput->decRefCount(); - srcmix->currentInput = NULL; + srcmix->currentInput = nullptr; } - if (srcmix->isTerminated() || srcmix->inputQueue->empty()) { + + if (!srcmix->inputQueue->try_pop(srcmix->currentInput)) { break; } - srcmix->inputQueue->pop(srcmix->currentInput); - if (srcmix->isTerminated()) { - break; - } - float srcPeak = srcmix->currentInput->peak * srcmix->gain; + + + double srcPeak = srcmix->currentInput->peak * srcmix->gain; if (mixPeak < srcPeak) { mixPeak = srcPeak; } @@ -158,25 +183,25 @@ static int audioCallback(void *outputBuffer, void * /* inputBuffer */, unsigned } } else { for (int i = 0, iMax = srcmix->currentInput->channels * nBufferFrames; i < iMax; i++) { + if (srcmix->audioQueuePtr >= srcmix->currentInput->data.size()) { srcmix->audioQueuePtr = 0; if (srcmix->currentInput) { srcmix->currentInput->decRefCount(); - srcmix->currentInput = NULL; + srcmix->currentInput = nullptr; } - if (srcmix->isTerminated() || srcmix->inputQueue->empty()) { + + if (!srcmix->inputQueue->try_pop(srcmix->currentInput)) { break; } - srcmix->inputQueue->pop(srcmix->currentInput); - if (srcmix->isTerminated()) { - break; - } - float srcPeak = srcmix->currentInput->peak * srcmix->gain; + + double srcPeak = srcmix->currentInput->peak * srcmix->gain; if (mixPeak < srcPeak) { mixPeak = srcPeak; } } if (srcmix->currentInput && srcmix->currentInput->data.size()) { + out[i] = out[i] + srcmix->currentInput->data[srcmix->audioQueuePtr] * srcmix->gain; } srcmix->audioQueuePtr++; @@ -186,11 +211,15 @@ static int audioCallback(void *outputBuffer, void * /* inputBuffer */, unsigned peak += mixPeak; } + //normalize volume if (peak > 1.0) { + float invPeak = (float)(1.0 / peak); + for (unsigned int i = 0; i < nBufferFrames * 2; i++) { - out[i] /= peak; + out[i] *= invPeak; } } + return 0; } @@ -247,6 +276,7 @@ void AudioThread::enumerateDevices(std::vector &devs) { } void AudioThread::setDeviceSampleRate(int deviceId, int sampleRate) { + if (deviceController.find(deviceId) != deviceController.end()) { AudioThreadCommand refreshDevice; refreshDevice.cmd = AudioThreadCommand::AUDIO_THREAD_CMD_SET_SAMPLE_RATE; @@ -261,6 +291,8 @@ void AudioThread::setSampleRate(int sampleRate) { dac.stopStream(); dac.closeStream(); + + std::lock_guard lock(m_mutex); for (size_t j = 0; j < boundThreads.load()->size(); j++) { AudioThread *srcmix = (*(boundThreads.load()))[j]; @@ -323,6 +355,7 @@ void AudioThread::setupDevice(int deviceId) { deviceThread[parameters.deviceId] = new std::thread(&AudioThread::threadMain, deviceController[parameters.deviceId]); } else if (deviceController[parameters.deviceId] == this) { + //Attach callback dac.openStream(¶meters, NULL, RTAUDIO_FLOAT32, sampleRate, &nBufferFrames, &audioCallback, (void *) this, &opts); dac.startStream(); } else { @@ -379,6 +412,7 @@ void AudioThread::run() { inputQueue = static_cast(getInputQueue("AudioDataInput")); + //Infinite loop, witing for commands or for termination while (!stopping) { AudioThreadCommand command; cmdQueue.pop(command); @@ -391,20 +425,26 @@ void AudioThread::run() { } } - // Drain any remaining inputs - if (inputQueue) while (!inputQueue->empty()) { - AudioThreadInput *ref; - inputQueue->pop(ref); + //Thread termination, prevent fancy things to happen, lock the whole thing: + //This way audioThreadCallback is rightly protected from thread termination + std::lock_guard lock(m_mutex); + + // Drain any remaining inputs, with a non-blocking pop + AudioThreadInput *ref; + while (inputQueue && inputQueue->try_pop(ref)) { + if (ref) { ref->decRefCount(); } - } + } //end while + //Nullify currentInput... if (currentInput) { currentInput->setRefCount(0); currentInput = nullptr; } + //Stop if (deviceController[parameters.deviceId] != this) { deviceController[parameters.deviceId]->removeThread(this); } else { @@ -444,8 +484,9 @@ void AudioThread::setActive(bool state) { // Activity state changing, clear any inputs if(inputQueue) { - while (!inputQueue->empty()) { // flush queue - inputQueue->pop(dummy); + + while (inputQueue->try_pop(dummy)) { // flush queue, non-blocking pop + if (dummy) { dummy->decRefCount(); } diff --git a/src/audio/AudioThread.h b/src/audio/AudioThread.h index aa664ac..61008f9 100644 --- a/src/audio/AudioThread.h +++ b/src/audio/AudioThread.h @@ -88,7 +88,13 @@ private: AudioThreadCommandQueue cmdQueue; int sampleRate; + //The own m_mutex protecting this AudioThread + std::recursive_mutex m_mutex; + public: + //give access to the this AudioThread lock + std::recursive_mutex& getMutex(); + void bindThread(AudioThread *other); void removeThread(AudioThread *other); diff --git a/src/demod/DemodulatorPreThread.cpp b/src/demod/DemodulatorPreThread.cpp index 69c9e34..da18d32 100644 --- a/src/demod/DemodulatorPreThread.cpp +++ b/src/demod/DemodulatorPreThread.cpp @@ -210,12 +210,11 @@ void DemodulatorPreThread::run() { inp->decRefCount(); - if (!stopping && !workerResults->empty()) { - while (!workerResults->empty()) { - DemodulatorWorkerThreadResult result; - workerResults->pop(result); - - switch (result.cmd) { + DemodulatorWorkerThreadResult result; + //process all worker results until + while (!stopping && workerResults->try_pop(result)) { + + switch (result.cmd) { case DemodulatorWorkerThreadResult::DEMOD_WORKER_THREAD_RESULT_FILTERS: if (result.iqResampler) { if (iqResampler) { @@ -258,20 +257,19 @@ void DemodulatorPreThread::run() { break; default: break; - } } - } + } //end while if ((cModem != nullptr) && modemSettingsChanged.load()) { cModem->writeSettings(modemSettingsBuffered); modemSettingsBuffered.clear(); modemSettingsChanged.store(false); } - } + } //end while stopping - while (!iqOutputQueue->empty()) { - DemodulatorThreadPostIQData *tmp; - iqOutputQueue->pop(tmp); + DemodulatorThreadPostIQData *tmp; + while (iqOutputQueue->try_pop(tmp)) { + tmp->decRefCount(); } buffers.purge(); diff --git a/src/demod/DemodulatorThread.cpp b/src/demod/DemodulatorThread.cpp index 391c092..bc2fb9f 100644 --- a/src/demod/DemodulatorThread.cpp +++ b/src/demod/DemodulatorThread.cpp @@ -250,43 +250,45 @@ void DemodulatorThread::run() { } } - if (!threadQueueControl->empty()) { - while (!threadQueueControl->empty()) { - DemodulatorThreadControlCommand command; - threadQueueControl->pop(command); - - switch (command.cmd) { - case DemodulatorThreadControlCommand::DEMOD_THREAD_CMD_CTL_SQUELCH_ON: - squelchEnabled = true; - break; - case DemodulatorThreadControlCommand::DEMOD_THREAD_CMD_CTL_SQUELCH_OFF: - squelchEnabled = false; - break; - default: - break; - } + DemodulatorThreadControlCommand command; + + //empty command queue, execute commands + while (threadQueueControl->try_pop(command)) { + + switch (command.cmd) { + case DemodulatorThreadControlCommand::DEMOD_THREAD_CMD_CTL_SQUELCH_ON: + squelchEnabled = true; + break; + case DemodulatorThreadControlCommand::DEMOD_THREAD_CMD_CTL_SQUELCH_OFF: + squelchEnabled = false; + break; + default: + break; } } + inp->decRefCount(); } // end while !stopping - // Purge any unused inputs - while (!iqInputQueue->empty()) { - DemodulatorThreadPostIQData *ref; - iqInputQueue->pop(ref); + // Purge any unused inputs, with a non-blocking pop + DemodulatorThreadPostIQData *ref; + while (iqInputQueue->try_pop(ref)) { + if (ref) { // May have other consumers; just decrement ref->decRefCount(); } } - while (!audioOutputQueue->empty()) { - AudioThreadInput *ref; - audioOutputQueue->pop(ref); - if (ref) { // Originated here; set RefCount to 0 - ref->setRefCount(0); + + AudioThreadInput *ref_audio; + while (audioOutputQueue->try_pop(ref_audio)) { + + if (ref_audio) { // Originated here; set RefCount to 0 + ref_audio->setRefCount(0); } } + outputBuffers.purge(); // std::cout << "Demodulator thread done." << std::endl; diff --git a/src/demod/DemodulatorWorkerThread.cpp b/src/demod/DemodulatorWorkerThread.cpp index 4fa600b..c5c59e8 100644 --- a/src/demod/DemodulatorWorkerThread.cpp +++ b/src/demod/DemodulatorWorkerThread.cpp @@ -24,8 +24,12 @@ void DemodulatorWorkerThread::run() { DemodulatorWorkerThreadCommand command; bool done = false; + //Beware of the subtility here, + //we are waiting for the first command to show up (blocking!) + //then consuming the commands until done. while (!done) { commandQueue->pop(command); + switch (command.cmd) { case DemodulatorWorkerThreadCommand::DEMOD_WORKER_THREAD_CMD_BUILD_FILTERS: filterChanged = true; diff --git a/src/forms/SDRDevices/SDRDevices.cpp b/src/forms/SDRDevices/SDRDevices.cpp index d22e5b9..df81fb8 100644 --- a/src/forms/SDRDevices/SDRDevices.cpp +++ b/src/forms/SDRDevices/SDRDevices.cpp @@ -314,7 +314,7 @@ void SDRDevicesDialog::OnUseSelected( wxMouseEvent& event) { devConfig->setStreamOpts(streamArgs); wxGetApp().setDeviceArgs(settingArgs); wxGetApp().setStreamArgs(streamArgs); - wxGetApp().setDevice(dev); + wxGetApp().setDevice(dev,0); Close(); } @@ -483,7 +483,7 @@ void SDRDevicesDialog::doRefreshDevices() { editId = nullptr; removeId = nullptr; dev = nullptr; - wxGetApp().stopDevice(false); + wxGetApp().stopDevice(false, 2000); devTree->DeleteAllItems(); devTree->Disable(); m_propertyGrid->Clear(); diff --git a/src/process/FFTDataDistributor.cpp b/src/process/FFTDataDistributor.cpp index c4c5632..140a50f 100644 --- a/src/process/FFTDataDistributor.cpp +++ b/src/process/FFTDataDistributor.cpp @@ -17,6 +17,7 @@ unsigned int FFTDataDistributor::getLinesPerSecond() { } void FFTDataDistributor::process() { + while (!input->empty()) { if (!isAnyOutputEmpty()) { return; diff --git a/src/process/ScopeVisualProcessor.cpp b/src/process/ScopeVisualProcessor.cpp index 97715d4..9815a77 100644 --- a/src/process/ScopeVisualProcessor.cpp +++ b/src/process/ScopeVisualProcessor.cpp @@ -68,10 +68,10 @@ void ScopeVisualProcessor::process() { if (!isOutputEmpty()) { return; } - if (!input->empty()) { - AudioThreadInput *audioInputData; - input->pop(audioInputData); - + AudioThreadInput *audioInputData; + + if (input->try_pop(audioInputData)) { + if (!audioInputData) { return; } @@ -271,5 +271,5 @@ void ScopeVisualProcessor::process() { } else { delete audioInputData; //->decRefCount(); } - } + } //end if try_pop() } diff --git a/src/sdr/SDRPostThread.cpp b/src/sdr/SDRPostThread.cpp index f730375..2a2c6a7 100644 --- a/src/sdr/SDRPostThread.cpp +++ b/src/sdr/SDRPostThread.cpp @@ -215,11 +215,14 @@ void SDRPostThread::run() { if (doUpdate) { updateActiveDemodulators(); } - } + } //end while - if (iqVisualQueue && !iqVisualQueue->empty()) { - DemodulatorThreadIQData *visualDataDummy; - iqVisualQueue->pop(visualDataDummy); + //TODO: Why only 1 element was removed before ? + DemodulatorThreadIQData *visualDataDummy; + while (iqVisualQueue && iqVisualQueue->try_pop(visualDataDummy)) { + //nothing + //TODO: What about the refcounts ? + } // buffers.purge(); diff --git a/src/sdr/SoapySDRThread.cpp b/src/sdr/SoapySDRThread.cpp index db450f9..81f4df2 100644 --- a/src/sdr/SoapySDRThread.cpp +++ b/src/sdr/SoapySDRThread.cpp @@ -162,6 +162,7 @@ void SDRThread::readStream(SDRThreadIQDataQueue* iqDataOutQueue) { int nElems = numElems.load(); int mtElems = mtuElems.load(); + //If overflow occured on the previous readStream(), transfer it in inpBuffer now if (numOverflow > 0) { int n_overflow = numOverflow; if (n_overflow > nElems) { @@ -176,9 +177,18 @@ void SDRThread::readStream(SDRThreadIQDataQueue* iqDataOutQueue) { } } + //attempt readStream() at most nElems, by mtElems-sized chunks, append inpBuffer. while (n_read < nElems && !stopping) { int n_requested = nElems-n_read; + int n_stream_read = device->readStream(stream, buffs, mtElems, flags, timeNs); + + //if the n_stream_read <= 0, bail out from reading. + if (n_stream_read <= 0) { + break; + } + + //sucess read beyond nElems, with overflow if ((n_read + n_stream_read) > nElems) { memcpy(&inpBuffer.data[n_read], buffs[0], n_requested * sizeof(float) * 2); numOverflow = n_stream_read-n_requested; diff --git a/src/visual/ScopeCanvas.cpp b/src/visual/ScopeCanvas.cpp index 2aa6e22..b289703 100644 --- a/src/visual/ScopeCanvas.cpp +++ b/src/visual/ScopeCanvas.cpp @@ -97,11 +97,10 @@ bool ScopeCanvas::getShowDb() { void ScopeCanvas::OnPaint(wxPaintEvent& WXUNUSED(event)) { wxPaintDC dc(this); const wxSize ClientSize = GetClientSize(); - - while (!inputData.empty()) { - ScopeRenderData *avData; - inputData.pop(avData); - + + ScopeRenderData *avData; + while (inputData.try_pop(avData)) { + if (!avData->spectrum) { scopePanel.setMode(avData->mode); diff --git a/src/visual/SpectrumCanvas.cpp b/src/visual/SpectrumCanvas.cpp index 678b0c1..8a31c85 100644 --- a/src/visual/SpectrumCanvas.cpp +++ b/src/visual/SpectrumCanvas.cpp @@ -51,11 +51,9 @@ void SpectrumCanvas::OnPaint(wxPaintEvent& WXUNUSED(event)) { wxPaintDC dc(this); const wxSize ClientSize = GetClientSize(); - if (!visualDataQueue.empty()) { - SpectrumVisualData *vData; - - visualDataQueue.pop(vData); - + SpectrumVisualData *vData; + if (visualDataQueue.try_pop(vData)) { + if (vData) { spectrumPanel.setPoints(vData->spectrum_points); spectrumPanel.setPeakPoints(vData->spectrum_hold_points); diff --git a/src/visual/WaterfallCanvas.cpp b/src/visual/WaterfallCanvas.cpp index 5d0e208..26816f4 100644 --- a/src/visual/WaterfallCanvas.cpp +++ b/src/visual/WaterfallCanvas.cpp @@ -95,8 +95,8 @@ void WaterfallCanvas::processInputQueue() { if (lpsIndex >= targetVis) { while (lpsIndex >= targetVis) { SpectrumVisualData *vData; - if (!visualDataQueue.empty()) { - visualDataQueue.pop(vData); + + if (visualDataQueue.try_pop(vData)) { if (vData) { if (vData->spectrum_points.size() == fft_size * 2) { @@ -912,11 +912,13 @@ void WaterfallCanvas::updateCenterFrequency(long long freq) { void WaterfallCanvas::setLinesPerSecond(int lps) { std::lock_guard < std::mutex > lock(tex_update); + linesPerSecond = lps; - while (!visualDataQueue.empty()) { - SpectrumVisualData *vData; - visualDataQueue.pop(vData); + //empty all + SpectrumVisualData *vData; + while (visualDataQueue.try_pop(vData)) { + if (vData) { vData->decRefCount(); }