Flush queues on terminate() calls to unblock push()s and so ease threads termination

This commit is contained in:
vsonnier 2017-08-28 20:31:07 +02:00
parent 2d01a279e9
commit 3f90cbb858
8 changed files with 60 additions and 41 deletions

View File

@ -68,10 +68,10 @@ DemodulatorInstance::DemodulatorInstance() {
demodulatorPreThread->setInputQueue("IQDataInput",pipeIQInputData);
demodulatorPreThread->setOutputQueue("IQDataOutput",pipeIQDemodData);
pipeAudioData = std::make_shared< AudioThreadInputQueue>();
pipeAudioData = std::make_shared<AudioThreadInputQueue>();
pipeAudioData->set_max_num_items(10);
threadQueueControl = std::make_shared< DemodulatorThreadControlCommandQueue>();
threadQueueControl = std::make_shared<DemodulatorThreadControlCommandQueue>();
threadQueueControl->set_max_num_items(2);
demodulatorThread = new DemodulatorThread(this);
@ -102,10 +102,10 @@ DemodulatorInstance::~DemodulatorInstance() {
#if ENABLE_DIGITAL_LAB
delete activeOutput;
#endif
delete audioThread;
delete demodulatorThread;
#endif
delete demodulatorPreThread;
delete demodulatorThread;
delete audioThread;
break;
}
@ -174,10 +174,18 @@ void DemodulatorInstance::terminate() {
// std::cout << "Terminating demodulator audio thread.." << std::endl;
audioThread->terminate();
// std::cout << "Terminating demodulator thread.." << std::endl;
demodulatorThread->terminate();
// std::cout << "Terminating demodulator preprocessor thread.." << std::endl;
demodulatorPreThread->terminate();
//that will actually unblock the currently blocked push().
pipeIQInputData->flush();
pipeAudioData->flush();
pipeIQDemodData->flush();
threadQueueControl->flush();
}
std::string DemodulatorInstance::getLabel() {
@ -197,15 +205,23 @@ bool DemodulatorInstance::isTerminated() {
bool demodTerminated = demodulatorThread->isTerminated();
bool preDemodTerminated = demodulatorPreThread->isTerminated();
//Cleanup the worker threads, if the threads are indeed terminated
if (audioTerminated) {
//Cleanup the worker threads, if the threads are indeed terminated.
// threads are linked as t_PreDemod ==> t_Demod ==> t_Audio
//so terminate in the same order to starve the following threads in succession.
//i.e waiting on timed-pop so able to se their stopping flag.
if (t_Audio) {
t_Audio->join();
if (preDemodTerminated) {
if (t_PreDemod) {
delete t_Audio;
t_Audio = nullptr;
}
#ifdef __APPLE__
pthread_join(t_PreDemod, NULL);
#else
t_PreDemod->join();
delete t_PreDemod;
#endif
t_PreDemod = nullptr;
}
}
if (demodTerminated) {
@ -221,18 +237,14 @@ bool DemodulatorInstance::isTerminated() {
}
}
if (preDemodTerminated) {
if (t_PreDemod) {
if (audioTerminated) {
#ifdef __APPLE__
pthread_join(t_PreDemod, NULL);
#else
t_PreDemod->join();
delete t_PreDemod;
#endif
t_PreDemod = nullptr;
}
if (t_Audio) {
t_Audio->join();
delete t_Audio;
t_Audio = nullptr;
}
}
bool terminated = audioTerminated && demodTerminated && preDemodTerminated;

View File

@ -352,6 +352,10 @@ void DemodulatorPreThread::terminate() {
IOThread::terminate();
workerThread->terminate();
//unblock the push()
iqOutputQueue->flush();
iqInputQueue->flush();
//wait blocking for termination here, it could be long with lots of modems and we MUST terminate properly,
//else better kill the whole application...
workerThread->isTerminated(5000);

View File

@ -241,7 +241,8 @@ void DemodulatorThread::run() {
}
if ((ati || modemDigital) && localAudioVisOutputQueue != nullptr && localAudioVisOutputQueue->empty()) {
AudioThreadInputPtr ati_vis(new AudioThreadInput);
AudioThreadInputPtr ati_vis = std::make_shared<AudioThreadInput>();
ati_vis->sampleRate = inp->sampleRate;
ati_vis->inputRate = inp->sampleRate;
@ -348,6 +349,11 @@ void DemodulatorThread::run() {
void DemodulatorThread::terminate() {
IOThread::terminate();
//unblock the curretly blocked push()
iqInputQueue->flush();
audioOutputQueue->flush();
threadQueueControl->flush();
}
bool DemodulatorThread::isMuted() {

View File

@ -116,4 +116,7 @@ void DemodulatorWorkerThread::run() {
void DemodulatorWorkerThread::terminate() {
IOThread::terminate();
//unblock the push()
resultQueue->flush();
commandQueue->flush();
}

View File

@ -5,10 +5,6 @@
#include <cstring>
#include <string>
//2s
#define MAX_BLOCKING_DURATION_MICROS (2000 * 1000)
ScopeVisualProcessor::ScopeVisualProcessor(): outputBuffers("ScopeVisualProcessorBuffers") {
scopeEnabled.store(true);
spectrumEnabled.store(true);
@ -120,7 +116,7 @@ void ScopeVisualProcessor::process() {
}
renderData->spectrum = false;
distribute(renderData, MAX_BLOCKING_DURATION_MICROS, "renderData");
distribute(renderData);
}
if (spectrumEnabled) {
@ -216,7 +212,7 @@ void ScopeVisualProcessor::process() {
renderData->fft_size = fftSize/2;
renderData->spectrum = true;
distribute(renderData, MAX_BLOCKING_DURATION_MICROS, "renderData");
distribute(renderData);
}
} //end if try_pop()
}

View File

@ -7,10 +7,6 @@
//50 ms
#define HEARTBEAT_CHECK_PERIOD_MICROS (50 * 1000)
//2s
#define MAX_BLOCKING_DURATION_MICROS (2000 * 1000)
SpectrumVisualProcessor::SpectrumVisualProcessor() : outputBuffers("SpectrumVisualProcessorBuffers") {
lastInputBandwidth = 0;
lastBandwidth = 0;
@ -596,7 +592,7 @@ void SpectrumVisualProcessor::process() {
output->centerFreq = centerFreq;
output->bandwidth = bandwidth;
distribute(output, MAX_BLOCKING_DURATION_MICROS, "output");
distribute(output);
}
}

View File

@ -188,10 +188,7 @@ void SDRPostThread::run() {
} //end while
//Be safe, remove as many elements as possible
if (iqVisualQueue) {
iqVisualQueue->flush();
}
iqVisualQueue->flush();
iqDataInQueue->flush();
iqDataOutQueue->flush();
iqActiveDemodVisualQueue->flush();
@ -201,6 +198,11 @@ void SDRPostThread::run() {
void SDRPostThread::terminate() {
IOThread::terminate();
//unblock push()
iqVisualQueue->flush();
iqDataInQueue->flush();
iqDataOutQueue->flush();
iqActiveDemodVisualQueue->flush();
}
void SDRPostThread::runSingleCH(SDRThreadIQData *data_in) {

View File

@ -98,7 +98,7 @@ public:
std::thread::id currentThreadId = std::this_thread::get_id();
std::cout << "WARNING: Thread 0x" << std::hex << currentThreadId << std::dec <<
" (" << currentThreadId << ") executing {" << typeid(*this).name() << "}.push() has failed with timeout > " <<
(timeout * 0.001) << " ms, message: " << errorMessage << std::endl << std::flush;
(timeout * 0.001) << " ms, message: '" << errorMessage << "'" << std::endl << std::flush;
}
return false;
}
@ -150,7 +150,7 @@ public:
std::thread::id currentThreadId = std::this_thread::get_id();
std::cout << "WARNING: Thread 0x" << std::hex << currentThreadId << std::dec <<
" (" << currentThreadId << ") executing {" << typeid(*this).name() << "}.pop() has failed with timeout > " <<
(timeout * 0.001) << " ms, message: " << errorMessage << std::endl << std::flush;
(timeout * 0.001) << " ms, message: '" << errorMessage << "'" << std::endl << std::flush;
}
return false;
}