Even better thread and application termination:

- Now that timed-pop()/try_pop() prevent indefinite blocking it is best to flush() on thread stopping to unblock push()
- Reworked a bit the CubicSDR::OnExit() sequence, see comments.
This commit is contained in:
vsonnier 2017-06-01 19:56:33 +02:00
parent 8f608bbf5c
commit 3604b7f1ff
9 changed files with 59 additions and 36 deletions

View File

@ -393,18 +393,19 @@ int CubicSDR::OnExit() {
std::cout << "Terminating SDR thread.." << std::endl; std::cout << "Terminating SDR thread.." << std::endl;
sdrThread->terminate(); sdrThread->terminate();
sdrThread->isTerminated(3000); sdrThread->isTerminated(3000);
if (t_SDR) {
t_SDR->join();
delete t_SDR;
t_SDR = nullptr;
}
std::cout << "Terminating SDR post-processing thread.." << std::endl; std::cout << "Terminating SDR post-processing thread.." << std::endl;
sdrPostThread->terminate(); sdrPostThread->terminate();
//Wait for termination for sdrPostThread second:: since it is doing
//mostly blocking push() to the other threads, they must stay alive
//so that sdrPostThread can complete a processing loop and die.
sdrPostThread->isTerminated(3000);
std::cout << "Terminating All Demodulators.." << std::endl; std::cout << "Terminating All Demodulators.." << std::endl;
demodMgr.terminateAll(); demodMgr.terminateAll();
//wait for effective death of all demodulators before continuing.
demodMgr.garbageCollect(true);
std::cout << "Terminating Visual Processor threads.." << std::endl; std::cout << "Terminating Visual Processor threads.." << std::endl;
spectrumVisualThread->terminate(); spectrumVisualThread->terminate();
@ -413,18 +414,28 @@ int CubicSDR::OnExit() {
} }
//Wait nicely //Wait nicely
sdrPostThread->isTerminated(1000);
spectrumVisualThread->isTerminated(1000); spectrumVisualThread->isTerminated(1000);
if (demodVisualThread) { if (demodVisualThread) {
demodVisualThread->isTerminated(1000); demodVisualThread->isTerminated(1000);
} }
//Then join the thread themselves //Then join the thread themselves:
if (t_SDR) {
t_SDR->join();
}
t_PostSDR->join(); t_PostSDR->join();
if (t_DemodVisual) t_DemodVisual->join();
if (t_DemodVisual) {
t_DemodVisual->join();
}
t_SpectrumVisual->join(); t_SpectrumVisual->join();
//Now only we can delete //Now only we can delete:
delete t_SDR;
t_SDR = nullptr;
delete sdrThread; delete sdrThread;
sdrThread = nullptr; sdrThread = nullptr;

View File

@ -278,29 +278,36 @@ DemodulatorInstance *DemodulatorMgr::getLastDemodulatorWith(const std::string& t
return nullptr; return nullptr;
} }
void DemodulatorMgr::garbageCollect() { void DemodulatorMgr::garbageCollect(boolean forcedGC) {
std::lock_guard < std::mutex > lock(deleted_demods_busy); std::lock_guard < std::mutex > lock(deleted_demods_busy);
std::vector<DemodulatorInstance *>::iterator it = demods_deleted.begin(); while (!demods_deleted.empty()) {
while (it != demods_deleted.end()) { std::vector<DemodulatorInstance *>::iterator it = demods_deleted.begin();
//make 1 pass over
while (it != demods_deleted.end()) {
if ((*it)->isTerminated()) { if ((*it)->isTerminated()) {
DemodulatorInstance *deleted = (*it); DemodulatorInstance *deleted = (*it);
std::cout << "Garbage collected demodulator instance '" << deleted->getLabel() << "'... " << std::endl << std::flush; std::cout << "Garbage collected demodulator instance '" << deleted->getLabel() << "'... " << std::endl << std::flush;
demods_deleted.erase(it); it = demods_deleted.erase(it);
delete deleted; delete deleted;
//only garbage collect 1 demod at a time. //only garbage collect 1 demod at a time.
return; if (!forcedGC) {
} return;
else { }
it++; }
} else {
} //end while it++;
}
} //end while
//stupid busy-wait loop
std::this_thread::sleep_for(std::chrono::milliseconds(5));
} //end while not empty
} }
void DemodulatorMgr::updateLastState() { void DemodulatorMgr::updateLastState() {

View File

@ -69,7 +69,11 @@ public:
DemodulatorInstance *loadInstance(DataNode *node); DemodulatorInstance *loadInstance(DataNode *node);
//to be called periodically to cleanup removed demodulators. //to be called periodically to cleanup removed demodulators.
void garbageCollect(); //if forcedGC = true, the methods waits until
//all deleted demodulators are effectively GCed.
//else: (default) the method test for effective termination
//and GC one demod per call.
void garbageCollect(boolean forcedGC = false);
private: private:

View File

@ -282,7 +282,7 @@ void DemodulatorPreThread::run() {
iqOutputQueue->flush(); iqOutputQueue->flush();
buffers.purge(); iqInputQueue->flush();
} }
void DemodulatorPreThread::setDemodType(std::string demodType) { void DemodulatorPreThread::setDemodType(std::string demodType) {

View File

@ -342,8 +342,6 @@ void DemodulatorThread::run() {
// Purge any unused inputs, with a non-blocking pop // Purge any unused inputs, with a non-blocking pop
iqInputQueue->flush(); iqInputQueue->flush();
audioOutputQueue->flush(); audioOutputQueue->flush();
outputBuffers.purge();
// std::cout << "Demodulator thread done." << std::endl; // std::cout << "Demodulator thread done." << std::endl;
} }

View File

@ -33,7 +33,7 @@ void DemodulatorWorkerThread::run() {
//Beware of the subtility here, //Beware of the subtility here,
//we are waiting for the first command to show up (blocking!) //we are waiting for the first command to show up (blocking!)
//then consuming the commands until done. //then consuming the commands until done.
while (!done) { while (!done && !stopping) {
if (!commandQueue->pop(command, HEARTBEAT_CHECK_PERIOD_MICROS)) { if (!commandQueue->pop(command, HEARTBEAT_CHECK_PERIOD_MICROS)) {
continue; continue;
} }
@ -51,7 +51,7 @@ void DemodulatorWorkerThread::run() {
break; break;
} }
done = commandQueue->empty(); done = commandQueue->empty();
} } //end while done.
if ((makeDemod || filterChanged) && !stopping) { if ((makeDemod || filterChanged) && !stopping) {
DemodulatorWorkerThreadResult result(DemodulatorWorkerThreadResult::DEMOD_WORKER_THREAD_RESULT_FILTERS); DemodulatorWorkerThreadResult result(DemodulatorWorkerThreadResult::DEMOD_WORKER_THREAD_RESULT_FILTERS);

View File

@ -76,6 +76,9 @@ void FFTVisualDataThread::run() {
wproc.run(); wproc.run();
} }
} }
pipeIQDataIn->flush();
pipeFFTDataOut->flush();
// std::cout << "FFT visual data thread done." << std::endl; // std::cout << "FFT visual data thread done." << std::endl;
} }

View File

@ -225,8 +225,9 @@ void SDRPostThread::run() {
iqVisualQueue->flush(); iqVisualQueue->flush();
} }
// buffers.purge(); iqDataInQueue->flush();
// visualDataBuffers.purge(); iqDataOutQueue->flush();
iqActiveDemodVisualQueue->flush();
// std::cout << "SDR post-processing thread done." << std::endl; // std::cout << "SDR post-processing thread done." << std::endl;
} }

View File

@ -377,8 +377,7 @@ void SDRThread::readLoop() {
updateSettings(); updateSettings();
readStream(iqDataOutQueue); readStream(iqDataOutQueue);
} }
iqDataOutQueue->flush();
buffers.purge();
} }
void SDRThread::updateGains() { void SDRThread::updateGains() {