diff --git a/src/CubicSDR.cpp b/src/CubicSDR.cpp index 6ec5fc1..6d14321 100644 --- a/src/CubicSDR.cpp +++ b/src/CubicSDR.cpp @@ -310,6 +310,11 @@ int CubicSDR::OnExit() { demodVisualThread->terminate(); t_DemodVisual->join(); + //Poor man join + sdrPostThread->isTerminated(1000); + spectrumVisualThread->isTerminated(1000); + demodVisualThread->isTerminated(1000); + delete sdrThread; delete sdrPostThread; diff --git a/src/IOThread.cpp b/src/IOThread.cpp index b3ffb0a..5cb5521 100644 --- a/src/IOThread.cpp +++ b/src/IOThread.cpp @@ -3,18 +3,33 @@ std::mutex ReBufferGC::g_mutex; std::set ReBufferGC::garbage; +#define SPIN_WAIT_SLEEP_MS 5 + IOThread::IOThread() { terminated.store(false); + stopping.store(false); } IOThread::~IOThread() { - + terminated.store(true); + stopping.store(true); } #ifdef __APPLE__ void *IOThread::threadMain() { terminated.store(false); - run(); + stopping.store(false); + try { + run(); + } + catch (...) { + terminated.store(true); + stopping.store(true); + throw; + } + + terminated.store(true); + stopping.store(true); return this; }; @@ -24,20 +39,32 @@ void *IOThread::pthread_helper(void *context) { #else void IOThread::threadMain() { terminated.store(false); - run(); + stopping.store(false); + try { + run(); + } + catch (...) { + terminated.store(true); + stopping.store(true); + throw; + } + + terminated.store(true); + stopping.store(true); }; #endif void IOThread::setup() { - + //redefined in subclasses }; void IOThread::run() { - + //redefined in subclasses }; + void IOThread::terminate() { - terminated.store(true); + stopping.store(true); }; void IOThread::onBindOutput(std::string /* name */, ThreadQueueBase* /* threadQueue */) { @@ -66,6 +93,34 @@ ThreadQueueBase *IOThread::getOutputQueue(std::string qname) { return output_queues[qname]; }; -bool IOThread::isTerminated() { +bool IOThread::isTerminated(int waitMs) { + + if (terminated.load()) { + return true; + } + else if (waitMs == 0) { + return false; + } + + //this is a stupid busy plus sleep loop + int nbCyclesToWait = 0; + + if (waitMs < 0) { + nbCyclesToWait = std::numeric_limits::max(); + } + else { + + nbCyclesToWait = (waitMs / SPIN_WAIT_SLEEP_MS) + 1; + } + + for ( int i = 0; i < nbCyclesToWait; i++) { + + std::this_thread::sleep_for(std::chrono::milliseconds(SPIN_WAIT_SLEEP_MS)); + + if (terminated.load()) { + return true; + } + } + return terminated.load(); } diff --git a/src/IOThread.h b/src/IOThread.h index f4645a8..2fe8504 100644 --- a/src/IOThread.h +++ b/src/IOThread.h @@ -7,6 +7,7 @@ #include #include #include +#include #include "ThreadQueue.h" #include "Timer.h" @@ -189,13 +190,23 @@ public: #ifdef __APPLE__ virtual void *threadMain(); #else + + //the thread Main call back itself virtual void threadMain(); #endif virtual void setup(); virtual void run(); + + //Request for termination (asynchronous) virtual void terminate(); - bool isTerminated(); + + //Returns true if the thread is indeed terminated, i.e the run() method + //has returned. + //If wait > 0 ms, the call is blocking at most 'waitMs' milliseconds for the thread to die, then returns. + //If wait < 0, the wait in infinite until the thread dies. + bool isTerminated(int waitMs = 0); + virtual void onBindOutput(std::string name, ThreadQueueBase* threadQueue); virtual void onBindInput(std::string name, ThreadQueueBase* threadQueue); @@ -207,6 +218,13 @@ public: protected: std::map input_queues; std::map output_queues; - std::atomic_bool terminated; + + //true when a termination is ordered + std::atomic_bool stopping; Timer gTimer; + +private: + //true when the thread has really ended, i.e run() from threadMain() has returned. + std::atomic_bool terminated; + }; diff --git a/src/audio/AudioThread.cpp b/src/audio/AudioThread.cpp index 0e400d0..ffd08cf 100644 --- a/src/audio/AudioThread.cpp +++ b/src/audio/AudioThread.cpp @@ -380,7 +380,7 @@ void AudioThread::run() { inputQueue = static_cast(getInputQueue("AudioDataInput")); threadQueueNotify = static_cast(getOutputQueue("NotifyQueue")); - while (!terminated) { + while (!stopping) { AudioThreadCommand command; cmdQueue.pop(command); @@ -430,7 +430,7 @@ void AudioThread::run() { } void AudioThread::terminate() { - terminated = true; + IOThread::terminate(); AudioThreadCommand endCond; // push an empty input to bump the queue cmdQueue.push(endCond); } diff --git a/src/audio/AudioThread.h b/src/audio/AudioThread.h index 97faaa1..fb17b1b 100644 --- a/src/audio/AudioThread.h +++ b/src/audio/AudioThread.h @@ -69,8 +69,8 @@ public: int getOutputDevice(); void setSampleRate(int sampleRate); int getSampleRate(); - void run(); - void terminate(); + virtual void run(); + virtual void terminate(); bool isActive(); void setActive(bool state); diff --git a/src/demod/DemodulatorInstance.cpp b/src/demod/DemodulatorInstance.cpp index fa90ed1..781c06f 100644 --- a/src/demod/DemodulatorInstance.cpp +++ b/src/demod/DemodulatorInstance.cpp @@ -35,10 +35,7 @@ DemodulatorInstance::DemodulatorInstance() { #if ENABLE_DIGITAL_LAB activeOutput = nullptr; #endif - terminated.store(true); - demodTerminated.store(true); - audioTerminated.store(true); - preDemodTerminated.store(true); + active.store(false); squelch.store(false); muted.store(false); @@ -125,7 +122,6 @@ void DemodulatorInstance::run() { #endif active = true; - audioTerminated = demodTerminated = preDemodTerminated = terminated = false; } @@ -163,7 +159,7 @@ bool DemodulatorInstance::isTerminated() { case DemodulatorThreadCommand::DEMOD_THREAD_CMD_AUDIO_TERMINATED: if (t_Audio) { t_Audio->join(); - audioTerminated = true; + delete t_Audio; t_Audio = nullptr; } @@ -183,7 +179,6 @@ bool DemodulatorInstance::isTerminated() { closeOutput(); } #endif - demodTerminated = true; break; case DemodulatorThreadCommand::DEMOD_THREAD_CMD_DEMOD_PREPROCESS_TERMINATED: if (t_PreDemod) { @@ -193,7 +188,6 @@ bool DemodulatorInstance::isTerminated() { t_PreDemod->join(); delete t_PreDemod; #endif - preDemodTerminated = true; t_PreDemod = nullptr; } break; @@ -202,7 +196,12 @@ bool DemodulatorInstance::isTerminated() { } } - terminated = audioTerminated && demodTerminated && preDemodTerminated; + // + bool audioTerminated = audioThread->isTerminated(); + bool demodTerminated = demodulatorThread->isTerminated(); + bool preDemodTerminated = demodulatorPreThread->isTerminated(); + + bool terminated = audioTerminated && demodTerminated && preDemodTerminated; return terminated; } diff --git a/src/demod/DemodulatorInstance.h b/src/demod/DemodulatorInstance.h index 8c030d5..91fba66 100644 --- a/src/demod/DemodulatorInstance.h +++ b/src/demod/DemodulatorInstance.h @@ -141,10 +141,7 @@ private: std::atomic label; // // User editable buffer, 16 bit string. std::atomic user_label; - std::atomic_bool terminated; // - std::atomic_bool demodTerminated; // - std::atomic_bool audioTerminated; // - std::atomic_bool preDemodTerminated; + std::atomic_bool active; std::atomic_bool squelch; std::atomic_bool muted; diff --git a/src/demod/DemodulatorPreThread.cpp b/src/demod/DemodulatorPreThread.cpp index 554b221..0d697d3 100644 --- a/src/demod/DemodulatorPreThread.cpp +++ b/src/demod/DemodulatorPreThread.cpp @@ -65,7 +65,7 @@ void DemodulatorPreThread::run() { t_Worker = new std::thread(&DemodulatorWorkerThread::threadMain, workerThread); - while (!terminated) { + while (!stopping) { DemodulatorThreadIQData *inp; iqInputQueue->pop(inp); @@ -211,7 +211,7 @@ void DemodulatorPreThread::run() { inp->decRefCount(); - if (!terminated && !workerResults->empty()) { + if (!stopping && !workerResults->empty()) { while (!workerResults->empty()) { DemodulatorWorkerThreadResult result; workerResults->pop(result); @@ -341,7 +341,7 @@ int DemodulatorPreThread::getAudioSampleRate() { } void DemodulatorPreThread::terminate() { - terminated = true; + IOThread::terminate(); DemodulatorThreadIQData *inp = new DemodulatorThreadIQData; // push dummy to nudge queue iqInputQueue->push(inp); DemodulatorWorkerThreadCommand command(DemodulatorWorkerThreadCommand::DEMOD_WORKER_THREAD_CMD_NULL); diff --git a/src/demod/DemodulatorPreThread.h b/src/demod/DemodulatorPreThread.h index e8c8879..ab5d959 100644 --- a/src/demod/DemodulatorPreThread.h +++ b/src/demod/DemodulatorPreThread.h @@ -15,7 +15,7 @@ public: DemodulatorPreThread(DemodulatorInstance *parent); ~DemodulatorPreThread(); - void run(); + virtual void run(); void setDemodType(std::string demodType); std::string getDemodType(); @@ -34,7 +34,7 @@ public: bool isInitialized(); - void terminate(); + virtual void terminate(); Modem *getModem(); ModemKit *getModemKit(); diff --git a/src/demod/DemodulatorThread.cpp b/src/demod/DemodulatorThread.cpp index c9b8a96..9441b1c 100644 --- a/src/demod/DemodulatorThread.cpp +++ b/src/demod/DemodulatorThread.cpp @@ -73,7 +73,7 @@ void DemodulatorThread::run() { ModemIQData modemData; - while (!terminated) { + while (!stopping) { DemodulatorThreadPostIQData *inp; iqInputQueue->pop(inp); // std::lock_guard < std::mutex > lock(inp->m_mutex); @@ -271,7 +271,7 @@ void DemodulatorThread::run() { inp->decRefCount(); } - // end while !terminated + // end while !stopping // Purge any unused inputs while (!iqInputQueue->empty()) { @@ -301,7 +301,7 @@ void DemodulatorThread::run() { } void DemodulatorThread::terminate() { - terminated = true; + IOThread::terminate(); DemodulatorThreadPostIQData *inp = new DemodulatorThreadPostIQData; // push dummy to nudge queue iqInputQueue->push(inp); } diff --git a/src/demod/DemodulatorWorkerThread.cpp b/src/demod/DemodulatorWorkerThread.cpp index 4d4a27d..4fa600b 100644 --- a/src/demod/DemodulatorWorkerThread.cpp +++ b/src/demod/DemodulatorWorkerThread.cpp @@ -17,7 +17,7 @@ void DemodulatorWorkerThread::run() { commandQueue = static_cast(getInputQueue("WorkerCommandQueue")); resultQueue = static_cast(getOutputQueue("WorkerResultQueue")); - while (!terminated) { + while (!stopping) { bool filterChanged = false; bool makeDemod = false; DemodulatorWorkerThreadCommand filterCommand, demodCommand; @@ -41,7 +41,7 @@ void DemodulatorWorkerThread::run() { done = commandQueue->empty(); } - if ((makeDemod || filterChanged) && !terminated) { + if ((makeDemod || filterChanged) && !stopping) { DemodulatorWorkerThreadResult result(DemodulatorWorkerThreadResult::DEMOD_WORKER_THREAD_RESULT_FILTERS); @@ -103,7 +103,7 @@ void DemodulatorWorkerThread::run() { } void DemodulatorWorkerThread::terminate() { - terminated = true; + IOThread::terminate(); DemodulatorWorkerThreadCommand inp; // push dummy to nudge queue commandQueue->push(inp); } diff --git a/src/demod/DemodulatorWorkerThread.h b/src/demod/DemodulatorWorkerThread.h index d2a8b4c..140046a 100644 --- a/src/demod/DemodulatorWorkerThread.h +++ b/src/demod/DemodulatorWorkerThread.h @@ -75,7 +75,7 @@ public: DemodulatorWorkerThread(); ~DemodulatorWorkerThread(); - void run(); + virtual void run(); void setCommandQueue(DemodulatorThreadWorkerCommandQueue *tQueue) { commandQueue = tQueue; @@ -85,7 +85,7 @@ public: resultQueue = tQueue; } - void terminate(); + virtual void terminate(); protected: diff --git a/src/process/FFTVisualDataThread.cpp b/src/process/FFTVisualDataThread.cpp index fc93b60..8eb9186 100644 --- a/src/process/FFTVisualDataThread.cpp +++ b/src/process/FFTVisualDataThread.cpp @@ -37,7 +37,7 @@ void FFTVisualDataThread::run() { // std::cout << "FFT visual data thread started." << std::endl; - while(!terminated) { + while(!stopping) { std::this_thread::sleep_for(std::chrono::milliseconds(10)); // std::this_thread::yield(); diff --git a/src/process/FFTVisualDataThread.h b/src/process/FFTVisualDataThread.h index 31eb971..bb16be4 100644 --- a/src/process/FFTVisualDataThread.h +++ b/src/process/FFTVisualDataThread.h @@ -13,7 +13,7 @@ public: int getLinesPerSecond(); SpectrumVisualProcessor *getProcessor(); - void run(); + virtual void run(); protected: FFTDataDistributor fftDistrib; diff --git a/src/process/SpectrumVisualDataThread.cpp b/src/process/SpectrumVisualDataThread.cpp index d981297..0c8981a 100644 --- a/src/process/SpectrumVisualDataThread.cpp +++ b/src/process/SpectrumVisualDataThread.cpp @@ -15,7 +15,7 @@ SpectrumVisualProcessor *SpectrumVisualDataThread::getProcessor() { void SpectrumVisualDataThread::run() { // std::cout << "Spectrum visual data thread started." << std::endl; - while(!terminated) { + while(!stopping) { std::this_thread::sleep_for(std::chrono::milliseconds(10)); // std::this_thread::yield(); sproc.run(); diff --git a/src/process/SpectrumVisualDataThread.h b/src/process/SpectrumVisualDataThread.h index 017b519..0987193 100644 --- a/src/process/SpectrumVisualDataThread.h +++ b/src/process/SpectrumVisualDataThread.h @@ -9,7 +9,7 @@ public: ~SpectrumVisualDataThread(); SpectrumVisualProcessor *getProcessor(); - void run(); + virtual void run(); protected: SpectrumVisualProcessor sproc; diff --git a/src/rig/RigThread.h b/src/rig/RigThread.h index 386e17b..53b521f 100644 --- a/src/rig/RigThread.h +++ b/src/rig/RigThread.h @@ -22,7 +22,7 @@ public: ~RigThread(); void initRig(rig_model_t rig_model, std::string rig_file, int serial_rate); - void run(); + virtual void run(); int terminationStatus(); diff --git a/src/sdr/SDREnumerator.cpp b/src/sdr/SDREnumerator.cpp index ae57e8d..116234b 100644 --- a/src/sdr/SDREnumerator.cpp +++ b/src/sdr/SDREnumerator.cpp @@ -309,7 +309,7 @@ std::vector *SDREnumerator::enumerate_devices(std::string remot void SDREnumerator::run() { std::cout << "SDR enumerator starting." << std::endl; - terminated.store(false); + wxGetApp().sdrEnumThreadNotify(SDREnumerator::SDR_ENUM_MESSAGE, "Scanning local devices, please wait.."); SDREnumerator::enumerate_devices(""); @@ -323,7 +323,6 @@ void SDREnumerator::run() { } std::cout << "Reporting enumeration complete." << std::endl; - terminated.store(true); wxGetApp().sdrEnumThreadNotify(SDREnumerator::SDR_ENUM_DEVICES_READY, "Finished scanning devices."); std::cout << "SDR enumerator done." << std::endl; diff --git a/src/sdr/SDREnumerator.h b/src/sdr/SDREnumerator.h index 384d003..6511243 100644 --- a/src/sdr/SDREnumerator.h +++ b/src/sdr/SDREnumerator.h @@ -23,7 +23,7 @@ public: static std::vector *enumerate_devices(std::string remoteAddr = "", bool noInit=false); - void run(); + virtual void run(); static SoapySDR::Kwargs argsStrToKwargs(const std::string &args); static void addRemote(std::string remoteAddr); diff --git a/src/sdr/SDRPostThread.cpp b/src/sdr/SDRPostThread.cpp index 59dd27f..f730375 100644 --- a/src/sdr/SDRPostThread.cpp +++ b/src/sdr/SDRPostThread.cpp @@ -184,7 +184,7 @@ void SDRPostThread::run() { iqDataInQueue->set_max_num_items(0); - while (!terminated) { + while (!stopping) { SDRThreadIQData *data_in; iqDataInQueue->pop(data_in); @@ -229,7 +229,7 @@ void SDRPostThread::run() { } void SDRPostThread::terminate() { - terminated = true; + IOThread::terminate(); SDRThreadIQData *dummy = new SDRThreadIQData; iqDataInQueue->push(dummy); } diff --git a/src/sdr/SDRPostThread.h b/src/sdr/SDRPostThread.h index ad396b1..c27fba1 100644 --- a/src/sdr/SDRPostThread.h +++ b/src/sdr/SDRPostThread.h @@ -16,8 +16,8 @@ public: void bindDemodulators(std::vector *demods); void removeDemodulator(DemodulatorInstance *demod); - void run(); - void terminate(); + virtual void run(); + virtual void terminate(); void runSingleCH(SDRThreadIQData *data_in); void runPFBCH(SDRThreadIQData *data_in); diff --git a/src/sdr/SoapySDRThread.cpp b/src/sdr/SoapySDRThread.cpp index f283b53..f4f9083 100644 --- a/src/sdr/SoapySDRThread.cpp +++ b/src/sdr/SoapySDRThread.cpp @@ -176,7 +176,7 @@ void SDRThread::readStream(SDRThreadIQDataQueue* iqDataOutQueue) { } } - while (n_read < nElems && !terminated) { + while (n_read < nElems && !stopping) { int n_requested = nElems-n_read; int n_stream_read = device->readStream(stream, buffs, mtElems, flags, timeNs); if ((n_read + n_stream_read) > nElems) { @@ -194,7 +194,7 @@ void SDRThread::readStream(SDRThreadIQDataQueue* iqDataOutQueue) { } } - if (n_read > 0 && !terminated) { + if (n_read > 0 && !stopping) { SDRThreadIQData *dataOut = buffers.getBuffer(); if (iq_swap.load()) { @@ -225,7 +225,7 @@ void SDRThread::readLoop() { updateGains(); - while (!terminated.load()) { + while (!stopping.load()) { updateSettings(); readStream(iqDataOutQueue); } @@ -360,7 +360,6 @@ void SDRThread::run() { //#endif std::cout << "SDR thread starting." << std::endl; - terminated.store(false); SDRDeviceInfo *activeDev = deviceInfo.load(); @@ -380,8 +379,8 @@ void SDRThread::run() { std::cout << "SDR thread done." << std::endl; - if (!terminated.load()) { - terminated.store(true); + if (!stopping.load()) { + stopping.store(true); wxGetApp().sdrThreadNotify(SDRThread::SDR_THREAD_TERMINATED, "Done."); } } diff --git a/src/sdr/SoapySDRThread.h b/src/sdr/SoapySDRThread.h index fb4b32f..0a04728 100644 --- a/src/sdr/SoapySDRThread.h +++ b/src/sdr/SoapySDRThread.h @@ -50,7 +50,7 @@ public: ~SDRThread(); enum SDRThreadState { SDR_THREAD_MESSAGE, SDR_THREAD_INITIALIZED, SDR_THREAD_TERMINATED, SDR_THREAD_FAILED }; - void run(); + virtual void run(); SDRDeviceInfo *getDevice(); void setDevice(SDRDeviceInfo *dev);