Merge pull request #391 from cjcliffe/vsonnier-thread_clean_termination

@vsonnier thread clean termination
This commit is contained in:
Charles J. Cliffe 2016-06-28 18:46:06 -04:00 committed by GitHub
commit c6c64ee7fb
24 changed files with 135 additions and 65 deletions

View File

@ -310,6 +310,11 @@ int CubicSDR::OnExit() {
demodVisualThread->terminate();
t_DemodVisual->join();
//Poor man join
sdrPostThread->isTerminated(1000);
spectrumVisualThread->isTerminated(1000);
demodVisualThread->isTerminated(1000);
delete sdrThread;
delete sdrPostThread;

View File

@ -3,18 +3,33 @@
std::mutex ReBufferGC::g_mutex;
std::set<ReferenceCounter *> ReBufferGC::garbage;
#define SPIN_WAIT_SLEEP_MS 5
IOThread::IOThread() {
terminated.store(false);
stopping.store(false);
}
IOThread::~IOThread() {
terminated.store(true);
stopping.store(true);
}
#ifdef __APPLE__
void *IOThread::threadMain() {
terminated.store(false);
run();
stopping.store(false);
try {
run();
}
catch (...) {
terminated.store(true);
stopping.store(true);
throw;
}
terminated.store(true);
stopping.store(true);
return this;
};
@ -24,20 +39,32 @@ void *IOThread::pthread_helper(void *context) {
#else
void IOThread::threadMain() {
terminated.store(false);
run();
stopping.store(false);
try {
run();
}
catch (...) {
terminated.store(true);
stopping.store(true);
throw;
}
terminated.store(true);
stopping.store(true);
};
#endif
void IOThread::setup() {
//redefined in subclasses
};
void IOThread::run() {
//redefined in subclasses
};
void IOThread::terminate() {
terminated.store(true);
stopping.store(true);
};
void IOThread::onBindOutput(std::string /* name */, ThreadQueueBase* /* threadQueue */) {
@ -66,6 +93,34 @@ ThreadQueueBase *IOThread::getOutputQueue(std::string qname) {
return output_queues[qname];
};
bool IOThread::isTerminated() {
bool IOThread::isTerminated(int waitMs) {
if (terminated.load()) {
return true;
}
else if (waitMs == 0) {
return false;
}
//this is a stupid busy plus sleep loop
int nbCyclesToWait = 0;
if (waitMs < 0) {
nbCyclesToWait = std::numeric_limits<int>::max();
}
else {
nbCyclesToWait = (waitMs / SPIN_WAIT_SLEEP_MS) + 1;
}
for ( int i = 0; i < nbCyclesToWait; i++) {
std::this_thread::sleep_for(std::chrono::milliseconds(SPIN_WAIT_SLEEP_MS));
if (terminated.load()) {
return true;
}
}
return terminated.load();
}

View File

@ -7,6 +7,7 @@
#include <set>
#include <string>
#include <iostream>
#include <thread>
#include "ThreadQueue.h"
#include "Timer.h"
@ -189,13 +190,23 @@ public:
#ifdef __APPLE__
virtual void *threadMain();
#else
//the thread Main call back itself
virtual void threadMain();
#endif
virtual void setup();
virtual void run();
//Request for termination (asynchronous)
virtual void terminate();
bool isTerminated();
//Returns true if the thread is indeed terminated, i.e the run() method
//has returned.
//If wait > 0 ms, the call is blocking at most 'waitMs' milliseconds for the thread to die, then returns.
//If wait < 0, the wait in infinite until the thread dies.
bool isTerminated(int waitMs = 0);
virtual void onBindOutput(std::string name, ThreadQueueBase* threadQueue);
virtual void onBindInput(std::string name, ThreadQueueBase* threadQueue);
@ -207,6 +218,13 @@ public:
protected:
std::map<std::string, ThreadQueueBase *, map_string_less> input_queues;
std::map<std::string, ThreadQueueBase *, map_string_less> output_queues;
std::atomic_bool terminated;
//true when a termination is ordered
std::atomic_bool stopping;
Timer gTimer;
private:
//true when the thread has really ended, i.e run() from threadMain() has returned.
std::atomic_bool terminated;
};

View File

@ -380,7 +380,7 @@ void AudioThread::run() {
inputQueue = static_cast<AudioThreadInputQueue *>(getInputQueue("AudioDataInput"));
threadQueueNotify = static_cast<DemodulatorThreadCommandQueue*>(getOutputQueue("NotifyQueue"));
while (!terminated) {
while (!stopping) {
AudioThreadCommand command;
cmdQueue.pop(command);
@ -430,7 +430,7 @@ void AudioThread::run() {
}
void AudioThread::terminate() {
terminated = true;
IOThread::terminate();
AudioThreadCommand endCond; // push an empty input to bump the queue
cmdQueue.push(endCond);
}

View File

@ -69,8 +69,8 @@ public:
int getOutputDevice();
void setSampleRate(int sampleRate);
int getSampleRate();
void run();
void terminate();
virtual void run();
virtual void terminate();
bool isActive();
void setActive(bool state);

View File

@ -35,10 +35,7 @@ DemodulatorInstance::DemodulatorInstance() {
#if ENABLE_DIGITAL_LAB
activeOutput = nullptr;
#endif
terminated.store(true);
demodTerminated.store(true);
audioTerminated.store(true);
preDemodTerminated.store(true);
active.store(false);
squelch.store(false);
muted.store(false);
@ -125,7 +122,6 @@ void DemodulatorInstance::run() {
#endif
active = true;
audioTerminated = demodTerminated = preDemodTerminated = terminated = false;
}
@ -163,7 +159,7 @@ bool DemodulatorInstance::isTerminated() {
case DemodulatorThreadCommand::DEMOD_THREAD_CMD_AUDIO_TERMINATED:
if (t_Audio) {
t_Audio->join();
audioTerminated = true;
delete t_Audio;
t_Audio = nullptr;
}
@ -183,7 +179,6 @@ bool DemodulatorInstance::isTerminated() {
closeOutput();
}
#endif
demodTerminated = true;
break;
case DemodulatorThreadCommand::DEMOD_THREAD_CMD_DEMOD_PREPROCESS_TERMINATED:
if (t_PreDemod) {
@ -193,7 +188,6 @@ bool DemodulatorInstance::isTerminated() {
t_PreDemod->join();
delete t_PreDemod;
#endif
preDemodTerminated = true;
t_PreDemod = nullptr;
}
break;
@ -202,7 +196,12 @@ bool DemodulatorInstance::isTerminated() {
}
}
terminated = audioTerminated && demodTerminated && preDemodTerminated;
//
bool audioTerminated = audioThread->isTerminated();
bool demodTerminated = demodulatorThread->isTerminated();
bool preDemodTerminated = demodulatorPreThread->isTerminated();
bool terminated = audioTerminated && demodTerminated && preDemodTerminated;
return terminated;
}

View File

@ -141,10 +141,7 @@ private:
std::atomic<std::string *> label; //
// User editable buffer, 16 bit string.
std::atomic<std::wstring *> user_label;
std::atomic_bool terminated; //
std::atomic_bool demodTerminated; //
std::atomic_bool audioTerminated; //
std::atomic_bool preDemodTerminated;
std::atomic_bool active;
std::atomic_bool squelch;
std::atomic_bool muted;

View File

@ -65,7 +65,7 @@ void DemodulatorPreThread::run() {
t_Worker = new std::thread(&DemodulatorWorkerThread::threadMain, workerThread);
while (!terminated) {
while (!stopping) {
DemodulatorThreadIQData *inp;
iqInputQueue->pop(inp);
@ -211,7 +211,7 @@ void DemodulatorPreThread::run() {
inp->decRefCount();
if (!terminated && !workerResults->empty()) {
if (!stopping && !workerResults->empty()) {
while (!workerResults->empty()) {
DemodulatorWorkerThreadResult result;
workerResults->pop(result);
@ -341,7 +341,7 @@ int DemodulatorPreThread::getAudioSampleRate() {
}
void DemodulatorPreThread::terminate() {
terminated = true;
IOThread::terminate();
DemodulatorThreadIQData *inp = new DemodulatorThreadIQData; // push dummy to nudge queue
iqInputQueue->push(inp);
DemodulatorWorkerThreadCommand command(DemodulatorWorkerThreadCommand::DEMOD_WORKER_THREAD_CMD_NULL);

View File

@ -15,7 +15,7 @@ public:
DemodulatorPreThread(DemodulatorInstance *parent);
~DemodulatorPreThread();
void run();
virtual void run();
void setDemodType(std::string demodType);
std::string getDemodType();
@ -34,7 +34,7 @@ public:
bool isInitialized();
void terminate();
virtual void terminate();
Modem *getModem();
ModemKit *getModemKit();

View File

@ -73,7 +73,7 @@ void DemodulatorThread::run() {
ModemIQData modemData;
while (!terminated) {
while (!stopping) {
DemodulatorThreadPostIQData *inp;
iqInputQueue->pop(inp);
// std::lock_guard < std::mutex > lock(inp->m_mutex);
@ -271,7 +271,7 @@ void DemodulatorThread::run() {
inp->decRefCount();
}
// end while !terminated
// end while !stopping
// Purge any unused inputs
while (!iqInputQueue->empty()) {
@ -301,7 +301,7 @@ void DemodulatorThread::run() {
}
void DemodulatorThread::terminate() {
terminated = true;
IOThread::terminate();
DemodulatorThreadPostIQData *inp = new DemodulatorThreadPostIQData; // push dummy to nudge queue
iqInputQueue->push(inp);
}

View File

@ -17,7 +17,7 @@ void DemodulatorWorkerThread::run() {
commandQueue = static_cast<DemodulatorThreadWorkerCommandQueue *>(getInputQueue("WorkerCommandQueue"));
resultQueue = static_cast<DemodulatorThreadWorkerResultQueue *>(getOutputQueue("WorkerResultQueue"));
while (!terminated) {
while (!stopping) {
bool filterChanged = false;
bool makeDemod = false;
DemodulatorWorkerThreadCommand filterCommand, demodCommand;
@ -41,7 +41,7 @@ void DemodulatorWorkerThread::run() {
done = commandQueue->empty();
}
if ((makeDemod || filterChanged) && !terminated) {
if ((makeDemod || filterChanged) && !stopping) {
DemodulatorWorkerThreadResult result(DemodulatorWorkerThreadResult::DEMOD_WORKER_THREAD_RESULT_FILTERS);
@ -103,7 +103,7 @@ void DemodulatorWorkerThread::run() {
}
void DemodulatorWorkerThread::terminate() {
terminated = true;
IOThread::terminate();
DemodulatorWorkerThreadCommand inp; // push dummy to nudge queue
commandQueue->push(inp);
}

View File

@ -75,7 +75,7 @@ public:
DemodulatorWorkerThread();
~DemodulatorWorkerThread();
void run();
virtual void run();
void setCommandQueue(DemodulatorThreadWorkerCommandQueue *tQueue) {
commandQueue = tQueue;
@ -85,7 +85,7 @@ public:
resultQueue = tQueue;
}
void terminate();
virtual void terminate();
protected:

View File

@ -37,7 +37,7 @@ void FFTVisualDataThread::run() {
// std::cout << "FFT visual data thread started." << std::endl;
while(!terminated) {
while(!stopping) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
// std::this_thread::yield();

View File

@ -13,7 +13,7 @@ public:
int getLinesPerSecond();
SpectrumVisualProcessor *getProcessor();
void run();
virtual void run();
protected:
FFTDataDistributor fftDistrib;

View File

@ -15,7 +15,7 @@ SpectrumVisualProcessor *SpectrumVisualDataThread::getProcessor() {
void SpectrumVisualDataThread::run() {
// std::cout << "Spectrum visual data thread started." << std::endl;
while(!terminated) {
while(!stopping) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
// std::this_thread::yield();
sproc.run();

View File

@ -9,7 +9,7 @@ public:
~SpectrumVisualDataThread();
SpectrumVisualProcessor *getProcessor();
void run();
virtual void run();
protected:
SpectrumVisualProcessor sproc;

View File

@ -3,7 +3,6 @@
std::vector<const struct rig_caps *> RigThread::rigCaps;
RigThread::RigThread() {
terminated.store(true);
freq = wxGetApp().getFrequency();
newFreq = freq;
freqChanged.store(true);
@ -46,8 +45,7 @@ void RigThread::run() {
int retcode, status;
termStatus = 0;
terminated.store(false);
std::cout << "Rig thread starting." << std::endl;
rig = rig_init(rigModel);
@ -57,7 +55,7 @@ void RigThread::run() {
if (retcode != 0) {
std::cout << "Rig failed to init. " << std::endl;
terminated.store(true);
IOThread::terminate();
return;
}
@ -69,7 +67,7 @@ void RigThread::run() {
std::cout << "Rig info was NULL." << std::endl;
}
while (!terminated.load()) {
while (!stopping) {
std::this_thread::sleep_for(std::chrono::milliseconds(150));
DemodulatorInstance *activeDemod = wxGetApp().getDemodMgr().getActiveDemodulator();
@ -77,7 +75,7 @@ void RigThread::run() {
if (freqChanged.load() && (controlMode.load() || setOneShot.load())) {
status = rig_get_freq(rig, RIG_VFO_CURR, &freq);
if (status == 0 && !terminated.load()) {
if (status == 0 && !stopping) {
if (freq != newFreq && setOneShot.load()) {
freq = newFreq;
@ -96,7 +94,7 @@ void RigThread::run() {
status = rig_get_freq(rig, RIG_VFO_CURR, &checkFreq);
if (status == 0 && !terminated.load()) {
if (status == 0 && !stopping) {
if (checkFreq != freq && followMode.load()) {
freq = checkFreq;
if (followModem.load()) {

View File

@ -22,7 +22,7 @@ public:
~RigThread();
void initRig(rig_model_t rig_model, std::string rig_file, int serial_rate);
void run();
virtual void run();
int terminationStatus();

View File

@ -309,7 +309,7 @@ std::vector<SDRDeviceInfo *> *SDREnumerator::enumerate_devices(std::string remot
void SDREnumerator::run() {
std::cout << "SDR enumerator starting." << std::endl;
terminated.store(false);
wxGetApp().sdrEnumThreadNotify(SDREnumerator::SDR_ENUM_MESSAGE, "Scanning local devices, please wait..");
SDREnumerator::enumerate_devices("");
@ -323,7 +323,6 @@ void SDREnumerator::run() {
}
std::cout << "Reporting enumeration complete." << std::endl;
terminated.store(true);
wxGetApp().sdrEnumThreadNotify(SDREnumerator::SDR_ENUM_DEVICES_READY, "Finished scanning devices.");
std::cout << "SDR enumerator done." << std::endl;

View File

@ -23,7 +23,7 @@ public:
static std::vector<SDRDeviceInfo *> *enumerate_devices(std::string remoteAddr = "", bool noInit=false);
void run();
virtual void run();
static SoapySDR::Kwargs argsStrToKwargs(const std::string &args);
static void addRemote(std::string remoteAddr);

View File

@ -184,7 +184,7 @@ void SDRPostThread::run() {
iqDataInQueue->set_max_num_items(0);
while (!terminated) {
while (!stopping) {
SDRThreadIQData *data_in;
iqDataInQueue->pop(data_in);
@ -229,7 +229,7 @@ void SDRPostThread::run() {
}
void SDRPostThread::terminate() {
terminated = true;
IOThread::terminate();
SDRThreadIQData *dummy = new SDRThreadIQData;
iqDataInQueue->push(dummy);
}

View File

@ -16,8 +16,8 @@ public:
void bindDemodulators(std::vector<DemodulatorInstance *> *demods);
void removeDemodulator(DemodulatorInstance *demod);
void run();
void terminate();
virtual void run();
virtual void terminate();
void runSingleCH(SDRThreadIQData *data_in);
void runPFBCH(SDRThreadIQData *data_in);

View File

@ -176,7 +176,7 @@ void SDRThread::readStream(SDRThreadIQDataQueue* iqDataOutQueue) {
}
}
while (n_read < nElems && !terminated) {
while (n_read < nElems && !stopping) {
int n_requested = nElems-n_read;
int n_stream_read = device->readStream(stream, buffs, mtElems, flags, timeNs);
if ((n_read + n_stream_read) > nElems) {
@ -194,7 +194,7 @@ void SDRThread::readStream(SDRThreadIQDataQueue* iqDataOutQueue) {
}
}
if (n_read > 0 && !terminated) {
if (n_read > 0 && !stopping) {
SDRThreadIQData *dataOut = buffers.getBuffer();
if (iq_swap.load()) {
@ -225,7 +225,7 @@ void SDRThread::readLoop() {
updateGains();
while (!terminated.load()) {
while (!stopping.load()) {
updateSettings();
readStream(iqDataOutQueue);
}
@ -360,7 +360,6 @@ void SDRThread::run() {
//#endif
std::cout << "SDR thread starting." << std::endl;
terminated.store(false);
SDRDeviceInfo *activeDev = deviceInfo.load();
@ -380,8 +379,8 @@ void SDRThread::run() {
std::cout << "SDR thread done." << std::endl;
if (!terminated.load()) {
terminated.store(true);
if (!stopping.load()) {
stopping.store(true);
wxGetApp().sdrThreadNotify(SDRThread::SDR_THREAD_TERMINATED, "Done.");
}
}

View File

@ -50,7 +50,7 @@ public:
~SDRThread();
enum SDRThreadState { SDR_THREAD_MESSAGE, SDR_THREAD_INITIALIZED, SDR_THREAD_TERMINATED, SDR_THREAD_FAILED };
void run();
virtual void run();
SDRDeviceInfo *getDevice();
void setDevice(SDRDeviceInfo *dev);