From 3f90cbb8580e92af372176922145d57fc3efa9a5 Mon Sep 17 00:00:00 2001 From: vsonnier Date: Mon, 28 Aug 2017 20:31:07 +0200 Subject: [PATCH] Flush queues on terminate() calls to unblock push()s and so ease threads termination --- src/demod/DemodulatorInstance.cpp | 58 +++++++++++++++---------- src/demod/DemodulatorPreThread.cpp | 4 ++ src/demod/DemodulatorThread.cpp | 8 +++- src/demod/DemodulatorWorkerThread.cpp | 3 ++ src/process/ScopeVisualProcessor.cpp | 8 +--- src/process/SpectrumVisualProcessor.cpp | 6 +-- src/sdr/SDRPostThread.cpp | 10 +++-- src/util/ThreadBlockingQueue.h | 4 +- 8 files changed, 60 insertions(+), 41 deletions(-) diff --git a/src/demod/DemodulatorInstance.cpp b/src/demod/DemodulatorInstance.cpp index 3efc504..cb6e63e 100644 --- a/src/demod/DemodulatorInstance.cpp +++ b/src/demod/DemodulatorInstance.cpp @@ -68,10 +68,10 @@ DemodulatorInstance::DemodulatorInstance() { demodulatorPreThread->setInputQueue("IQDataInput",pipeIQInputData); demodulatorPreThread->setOutputQueue("IQDataOutput",pipeIQDemodData); - pipeAudioData = std::make_shared< AudioThreadInputQueue>(); + pipeAudioData = std::make_shared(); pipeAudioData->set_max_num_items(10); - threadQueueControl = std::make_shared< DemodulatorThreadControlCommandQueue>(); + threadQueueControl = std::make_shared(); threadQueueControl->set_max_num_items(2); demodulatorThread = new DemodulatorThread(this); @@ -102,10 +102,10 @@ DemodulatorInstance::~DemodulatorInstance() { #if ENABLE_DIGITAL_LAB delete activeOutput; -#endif - delete audioThread; - delete demodulatorThread; +#endif delete demodulatorPreThread; + delete demodulatorThread; + delete audioThread; break; } @@ -174,10 +174,18 @@ void DemodulatorInstance::terminate() { // std::cout << "Terminating demodulator audio thread.." << std::endl; audioThread->terminate(); + // std::cout << "Terminating demodulator thread.." << std::endl; demodulatorThread->terminate(); + // std::cout << "Terminating demodulator preprocessor thread.." << std::endl; demodulatorPreThread->terminate(); + + //that will actually unblock the currently blocked push(). + pipeIQInputData->flush(); + pipeAudioData->flush(); + pipeIQDemodData->flush(); + threadQueueControl->flush(); } std::string DemodulatorInstance::getLabel() { @@ -197,15 +205,23 @@ bool DemodulatorInstance::isTerminated() { bool demodTerminated = demodulatorThread->isTerminated(); bool preDemodTerminated = demodulatorPreThread->isTerminated(); - //Cleanup the worker threads, if the threads are indeed terminated - if (audioTerminated) { + //Cleanup the worker threads, if the threads are indeed terminated. + // threads are linked as t_PreDemod ==> t_Demod ==> t_Audio + //so terminate in the same order to starve the following threads in succession. + //i.e waiting on timed-pop so able to se their stopping flag. - if (t_Audio) { - t_Audio->join(); + if (preDemodTerminated) { + + if (t_PreDemod) { - delete t_Audio; - t_Audio = nullptr; - } +#ifdef __APPLE__ + pthread_join(t_PreDemod, NULL); +#else + t_PreDemod->join(); + delete t_PreDemod; +#endif + t_PreDemod = nullptr; + } } if (demodTerminated) { @@ -221,18 +237,14 @@ bool DemodulatorInstance::isTerminated() { } } - if (preDemodTerminated) { - - if (t_PreDemod) { + if (audioTerminated) { -#ifdef __APPLE__ - pthread_join(t_PreDemod, NULL); -#else - t_PreDemod->join(); - delete t_PreDemod; -#endif - t_PreDemod = nullptr; - } + if (t_Audio) { + t_Audio->join(); + + delete t_Audio; + t_Audio = nullptr; + } } bool terminated = audioTerminated && demodTerminated && preDemodTerminated; diff --git a/src/demod/DemodulatorPreThread.cpp b/src/demod/DemodulatorPreThread.cpp index dff7b30..5943c7f 100644 --- a/src/demod/DemodulatorPreThread.cpp +++ b/src/demod/DemodulatorPreThread.cpp @@ -352,6 +352,10 @@ void DemodulatorPreThread::terminate() { IOThread::terminate(); workerThread->terminate(); + //unblock the push() + iqOutputQueue->flush(); + iqInputQueue->flush(); + //wait blocking for termination here, it could be long with lots of modems and we MUST terminate properly, //else better kill the whole application... workerThread->isTerminated(5000); diff --git a/src/demod/DemodulatorThread.cpp b/src/demod/DemodulatorThread.cpp index 64ceaf1..d1bfb56 100644 --- a/src/demod/DemodulatorThread.cpp +++ b/src/demod/DemodulatorThread.cpp @@ -241,7 +241,8 @@ void DemodulatorThread::run() { } if ((ati || modemDigital) && localAudioVisOutputQueue != nullptr && localAudioVisOutputQueue->empty()) { - AudioThreadInputPtr ati_vis(new AudioThreadInput); + + AudioThreadInputPtr ati_vis = std::make_shared(); ati_vis->sampleRate = inp->sampleRate; ati_vis->inputRate = inp->sampleRate; @@ -348,6 +349,11 @@ void DemodulatorThread::run() { void DemodulatorThread::terminate() { IOThread::terminate(); + + //unblock the curretly blocked push() + iqInputQueue->flush(); + audioOutputQueue->flush(); + threadQueueControl->flush(); } bool DemodulatorThread::isMuted() { diff --git a/src/demod/DemodulatorWorkerThread.cpp b/src/demod/DemodulatorWorkerThread.cpp index a2bd35d..53a1a96 100644 --- a/src/demod/DemodulatorWorkerThread.cpp +++ b/src/demod/DemodulatorWorkerThread.cpp @@ -116,4 +116,7 @@ void DemodulatorWorkerThread::run() { void DemodulatorWorkerThread::terminate() { IOThread::terminate(); + //unblock the push() + resultQueue->flush(); + commandQueue->flush(); } diff --git a/src/process/ScopeVisualProcessor.cpp b/src/process/ScopeVisualProcessor.cpp index 8b204b3..a1eefac 100644 --- a/src/process/ScopeVisualProcessor.cpp +++ b/src/process/ScopeVisualProcessor.cpp @@ -5,10 +5,6 @@ #include #include -//2s -#define MAX_BLOCKING_DURATION_MICROS (2000 * 1000) - - ScopeVisualProcessor::ScopeVisualProcessor(): outputBuffers("ScopeVisualProcessorBuffers") { scopeEnabled.store(true); spectrumEnabled.store(true); @@ -120,7 +116,7 @@ void ScopeVisualProcessor::process() { } renderData->spectrum = false; - distribute(renderData, MAX_BLOCKING_DURATION_MICROS, "renderData"); + distribute(renderData); } if (spectrumEnabled) { @@ -216,7 +212,7 @@ void ScopeVisualProcessor::process() { renderData->fft_size = fftSize/2; renderData->spectrum = true; - distribute(renderData, MAX_BLOCKING_DURATION_MICROS, "renderData"); + distribute(renderData); } } //end if try_pop() } diff --git a/src/process/SpectrumVisualProcessor.cpp b/src/process/SpectrumVisualProcessor.cpp index bc29899..de20052 100644 --- a/src/process/SpectrumVisualProcessor.cpp +++ b/src/process/SpectrumVisualProcessor.cpp @@ -7,10 +7,6 @@ //50 ms #define HEARTBEAT_CHECK_PERIOD_MICROS (50 * 1000) -//2s -#define MAX_BLOCKING_DURATION_MICROS (2000 * 1000) - - SpectrumVisualProcessor::SpectrumVisualProcessor() : outputBuffers("SpectrumVisualProcessorBuffers") { lastInputBandwidth = 0; lastBandwidth = 0; @@ -596,7 +592,7 @@ void SpectrumVisualProcessor::process() { output->centerFreq = centerFreq; output->bandwidth = bandwidth; - distribute(output, MAX_BLOCKING_DURATION_MICROS, "output"); + distribute(output); } } diff --git a/src/sdr/SDRPostThread.cpp b/src/sdr/SDRPostThread.cpp index afdc16f..08f68ea 100644 --- a/src/sdr/SDRPostThread.cpp +++ b/src/sdr/SDRPostThread.cpp @@ -188,10 +188,7 @@ void SDRPostThread::run() { } //end while //Be safe, remove as many elements as possible - if (iqVisualQueue) { - iqVisualQueue->flush(); - } - + iqVisualQueue->flush(); iqDataInQueue->flush(); iqDataOutQueue->flush(); iqActiveDemodVisualQueue->flush(); @@ -201,6 +198,11 @@ void SDRPostThread::run() { void SDRPostThread::terminate() { IOThread::terminate(); + //unblock push() + iqVisualQueue->flush(); + iqDataInQueue->flush(); + iqDataOutQueue->flush(); + iqActiveDemodVisualQueue->flush(); } void SDRPostThread::runSingleCH(SDRThreadIQData *data_in) { diff --git a/src/util/ThreadBlockingQueue.h b/src/util/ThreadBlockingQueue.h index 476b205..dd6a697 100644 --- a/src/util/ThreadBlockingQueue.h +++ b/src/util/ThreadBlockingQueue.h @@ -98,7 +98,7 @@ public: std::thread::id currentThreadId = std::this_thread::get_id(); std::cout << "WARNING: Thread 0x" << std::hex << currentThreadId << std::dec << " (" << currentThreadId << ") executing {" << typeid(*this).name() << "}.push() has failed with timeout > " << - (timeout * 0.001) << " ms, message: " << errorMessage << std::endl << std::flush; + (timeout * 0.001) << " ms, message: '" << errorMessage << "'" << std::endl << std::flush; } return false; } @@ -150,7 +150,7 @@ public: std::thread::id currentThreadId = std::this_thread::get_id(); std::cout << "WARNING: Thread 0x" << std::hex << currentThreadId << std::dec << " (" << currentThreadId << ") executing {" << typeid(*this).name() << "}.pop() has failed with timeout > " << - (timeout * 0.001) << " ms, message: " << errorMessage << std::endl << std::flush; + (timeout * 0.001) << " ms, message: '" << errorMessage << "'" << std::endl << std::flush; } return false; }