mirror of
https://github.com/cjcliffe/CubicSDR.git
synced 2024-11-12 23:26:10 -05:00
THREAD_CLEAN: Clearly distinguish between a request to stop by terminate() from the actual termination isTerminated()
This commit is contained in:
parent
c57478dc2a
commit
52de909cfb
@ -310,6 +310,11 @@ int CubicSDR::OnExit() {
|
||||
demodVisualThread->terminate();
|
||||
t_DemodVisual->join();
|
||||
|
||||
//Poor man join
|
||||
sdrPostThread->isTerminated(1000);
|
||||
spectrumVisualThread->isTerminated(1000);
|
||||
demodVisualThread->isTerminated(1000);
|
||||
|
||||
delete sdrThread;
|
||||
|
||||
delete sdrPostThread;
|
||||
|
@ -3,18 +3,33 @@
|
||||
std::mutex ReBufferGC::g_mutex;
|
||||
std::set<ReferenceCounter *> ReBufferGC::garbage;
|
||||
|
||||
#define SPIN_WAIT_SLEEP_MS 5
|
||||
|
||||
IOThread::IOThread() {
|
||||
terminated.store(false);
|
||||
stopping.store(false);
|
||||
}
|
||||
|
||||
IOThread::~IOThread() {
|
||||
|
||||
terminated.store(true);
|
||||
stopping.store(true);
|
||||
}
|
||||
|
||||
#ifdef __APPLE__
|
||||
void *IOThread::threadMain() {
|
||||
terminated.store(false);
|
||||
run();
|
||||
stopping.store(false);
|
||||
try {
|
||||
run();
|
||||
}
|
||||
catch (...) {
|
||||
terminated.store(true);
|
||||
stopping.store(true);
|
||||
throw;
|
||||
}
|
||||
|
||||
terminated.store(true);
|
||||
stopping.store(true);
|
||||
return this;
|
||||
};
|
||||
|
||||
@ -24,20 +39,32 @@ void *IOThread::pthread_helper(void *context) {
|
||||
#else
|
||||
void IOThread::threadMain() {
|
||||
terminated.store(false);
|
||||
run();
|
||||
stopping.store(false);
|
||||
try {
|
||||
run();
|
||||
}
|
||||
catch (...) {
|
||||
terminated.store(true);
|
||||
stopping.store(true);
|
||||
throw;
|
||||
}
|
||||
|
||||
terminated.store(true);
|
||||
stopping.store(true);
|
||||
};
|
||||
#endif
|
||||
|
||||
void IOThread::setup() {
|
||||
|
||||
//redefined in subclasses
|
||||
};
|
||||
|
||||
void IOThread::run() {
|
||||
|
||||
//redefined in subclasses
|
||||
};
|
||||
|
||||
|
||||
void IOThread::terminate() {
|
||||
terminated.store(true);
|
||||
stopping.store(true);
|
||||
};
|
||||
|
||||
void IOThread::onBindOutput(std::string /* name */, ThreadQueueBase* /* threadQueue */) {
|
||||
@ -66,6 +93,34 @@ ThreadQueueBase *IOThread::getOutputQueue(std::string qname) {
|
||||
return output_queues[qname];
|
||||
};
|
||||
|
||||
bool IOThread::isTerminated() {
|
||||
bool IOThread::isTerminated(int waitMs) {
|
||||
|
||||
if (terminated.load()) {
|
||||
return true;
|
||||
}
|
||||
else if (waitMs == 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
//this is a stupid busy plus sleep loop
|
||||
int nbCyclesToWait = 0;
|
||||
|
||||
if (waitMs < 0) {
|
||||
nbCyclesToWait = std::numeric_limits<int>::max();
|
||||
}
|
||||
else {
|
||||
|
||||
nbCyclesToWait = (waitMs / SPIN_WAIT_SLEEP_MS) + 1;
|
||||
}
|
||||
|
||||
for ( int i = 0; i < nbCyclesToWait; i++) {
|
||||
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(SPIN_WAIT_SLEEP_MS));
|
||||
|
||||
if (terminated.load()) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return terminated.load();
|
||||
}
|
||||
|
@ -7,6 +7,7 @@
|
||||
#include <set>
|
||||
#include <string>
|
||||
#include <iostream>
|
||||
#include <thread>
|
||||
|
||||
#include "ThreadQueue.h"
|
||||
#include "Timer.h"
|
||||
@ -189,13 +190,23 @@ public:
|
||||
#ifdef __APPLE__
|
||||
virtual void *threadMain();
|
||||
#else
|
||||
|
||||
//the thread Main call back itself
|
||||
virtual void threadMain();
|
||||
#endif
|
||||
|
||||
virtual void setup();
|
||||
virtual void run();
|
||||
|
||||
//Request for termination (asynchronous)
|
||||
virtual void terminate();
|
||||
bool isTerminated();
|
||||
|
||||
//Returns true if the thread is indeed terminated, i.e the run() method
|
||||
//has returned.
|
||||
//If wait > 0 ms, the call is blocking at most 'waitMs' milliseconds for the thread to die, then returns.
|
||||
//If wait < 0, the wait in infinite until the thread dies.
|
||||
bool isTerminated(int waitMs = 0);
|
||||
|
||||
virtual void onBindOutput(std::string name, ThreadQueueBase* threadQueue);
|
||||
virtual void onBindInput(std::string name, ThreadQueueBase* threadQueue);
|
||||
|
||||
@ -207,6 +218,13 @@ public:
|
||||
protected:
|
||||
std::map<std::string, ThreadQueueBase *, map_string_less> input_queues;
|
||||
std::map<std::string, ThreadQueueBase *, map_string_less> output_queues;
|
||||
std::atomic_bool terminated;
|
||||
|
||||
//true when a termination is ordered
|
||||
std::atomic_bool stopping;
|
||||
Timer gTimer;
|
||||
|
||||
private:
|
||||
//true when the thread has really ended, i.e run() from threadMain() has returned.
|
||||
std::atomic_bool terminated;
|
||||
|
||||
};
|
||||
|
@ -380,7 +380,7 @@ void AudioThread::run() {
|
||||
inputQueue = static_cast<AudioThreadInputQueue *>(getInputQueue("AudioDataInput"));
|
||||
threadQueueNotify = static_cast<DemodulatorThreadCommandQueue*>(getOutputQueue("NotifyQueue"));
|
||||
|
||||
while (!terminated) {
|
||||
while (!stopping) {
|
||||
AudioThreadCommand command;
|
||||
cmdQueue.pop(command);
|
||||
|
||||
@ -430,7 +430,7 @@ void AudioThread::run() {
|
||||
}
|
||||
|
||||
void AudioThread::terminate() {
|
||||
terminated = true;
|
||||
IOThread::terminate();
|
||||
AudioThreadCommand endCond; // push an empty input to bump the queue
|
||||
cmdQueue.push(endCond);
|
||||
}
|
||||
|
@ -69,8 +69,8 @@ public:
|
||||
int getOutputDevice();
|
||||
void setSampleRate(int sampleRate);
|
||||
int getSampleRate();
|
||||
void run();
|
||||
void terminate();
|
||||
virtual void run();
|
||||
virtual void terminate();
|
||||
|
||||
bool isActive();
|
||||
void setActive(bool state);
|
||||
|
@ -35,10 +35,7 @@ DemodulatorInstance::DemodulatorInstance() {
|
||||
#if ENABLE_DIGITAL_LAB
|
||||
activeOutput = nullptr;
|
||||
#endif
|
||||
terminated.store(true);
|
||||
demodTerminated.store(true);
|
||||
audioTerminated.store(true);
|
||||
preDemodTerminated.store(true);
|
||||
|
||||
active.store(false);
|
||||
squelch.store(false);
|
||||
muted.store(false);
|
||||
@ -125,7 +122,6 @@ void DemodulatorInstance::run() {
|
||||
#endif
|
||||
|
||||
active = true;
|
||||
audioTerminated = demodTerminated = preDemodTerminated = terminated = false;
|
||||
|
||||
}
|
||||
|
||||
@ -163,7 +159,7 @@ bool DemodulatorInstance::isTerminated() {
|
||||
case DemodulatorThreadCommand::DEMOD_THREAD_CMD_AUDIO_TERMINATED:
|
||||
if (t_Audio) {
|
||||
t_Audio->join();
|
||||
audioTerminated = true;
|
||||
|
||||
delete t_Audio;
|
||||
t_Audio = nullptr;
|
||||
}
|
||||
@ -183,7 +179,6 @@ bool DemodulatorInstance::isTerminated() {
|
||||
closeOutput();
|
||||
}
|
||||
#endif
|
||||
demodTerminated = true;
|
||||
break;
|
||||
case DemodulatorThreadCommand::DEMOD_THREAD_CMD_DEMOD_PREPROCESS_TERMINATED:
|
||||
if (t_PreDemod) {
|
||||
@ -193,7 +188,6 @@ bool DemodulatorInstance::isTerminated() {
|
||||
t_PreDemod->join();
|
||||
delete t_PreDemod;
|
||||
#endif
|
||||
preDemodTerminated = true;
|
||||
t_PreDemod = nullptr;
|
||||
}
|
||||
break;
|
||||
@ -202,7 +196,12 @@ bool DemodulatorInstance::isTerminated() {
|
||||
}
|
||||
}
|
||||
|
||||
terminated = audioTerminated && demodTerminated && preDemodTerminated;
|
||||
//
|
||||
bool audioTerminated = audioThread->isTerminated();
|
||||
bool demodTerminated = demodulatorThread->isTerminated();
|
||||
bool preDemodTerminated = demodulatorPreThread->isTerminated();
|
||||
|
||||
bool terminated = audioTerminated && demodTerminated && preDemodTerminated;
|
||||
|
||||
return terminated;
|
||||
}
|
||||
|
@ -141,10 +141,7 @@ private:
|
||||
std::atomic<std::string *> label; //
|
||||
// User editable buffer, 16 bit string.
|
||||
std::atomic<std::wstring *> user_label;
|
||||
std::atomic_bool terminated; //
|
||||
std::atomic_bool demodTerminated; //
|
||||
std::atomic_bool audioTerminated; //
|
||||
std::atomic_bool preDemodTerminated;
|
||||
|
||||
std::atomic_bool active;
|
||||
std::atomic_bool squelch;
|
||||
std::atomic_bool muted;
|
||||
|
@ -65,7 +65,7 @@ void DemodulatorPreThread::run() {
|
||||
|
||||
t_Worker = new std::thread(&DemodulatorWorkerThread::threadMain, workerThread);
|
||||
|
||||
while (!terminated) {
|
||||
while (!stopping) {
|
||||
DemodulatorThreadIQData *inp;
|
||||
iqInputQueue->pop(inp);
|
||||
|
||||
@ -211,7 +211,7 @@ void DemodulatorPreThread::run() {
|
||||
|
||||
inp->decRefCount();
|
||||
|
||||
if (!terminated && !workerResults->empty()) {
|
||||
if (!stopping && !workerResults->empty()) {
|
||||
while (!workerResults->empty()) {
|
||||
DemodulatorWorkerThreadResult result;
|
||||
workerResults->pop(result);
|
||||
@ -341,7 +341,7 @@ int DemodulatorPreThread::getAudioSampleRate() {
|
||||
}
|
||||
|
||||
void DemodulatorPreThread::terminate() {
|
||||
terminated = true;
|
||||
IOThread::terminate();
|
||||
DemodulatorThreadIQData *inp = new DemodulatorThreadIQData; // push dummy to nudge queue
|
||||
iqInputQueue->push(inp);
|
||||
DemodulatorWorkerThreadCommand command(DemodulatorWorkerThreadCommand::DEMOD_WORKER_THREAD_CMD_NULL);
|
||||
|
@ -15,7 +15,7 @@ public:
|
||||
DemodulatorPreThread(DemodulatorInstance *parent);
|
||||
~DemodulatorPreThread();
|
||||
|
||||
void run();
|
||||
virtual void run();
|
||||
|
||||
void setDemodType(std::string demodType);
|
||||
std::string getDemodType();
|
||||
@ -34,7 +34,7 @@ public:
|
||||
|
||||
bool isInitialized();
|
||||
|
||||
void terminate();
|
||||
virtual void terminate();
|
||||
|
||||
Modem *getModem();
|
||||
ModemKit *getModemKit();
|
||||
|
@ -73,7 +73,7 @@ void DemodulatorThread::run() {
|
||||
|
||||
ModemIQData modemData;
|
||||
|
||||
while (!terminated) {
|
||||
while (!stopping) {
|
||||
DemodulatorThreadPostIQData *inp;
|
||||
iqInputQueue->pop(inp);
|
||||
// std::lock_guard < std::mutex > lock(inp->m_mutex);
|
||||
@ -271,7 +271,7 @@ void DemodulatorThread::run() {
|
||||
|
||||
inp->decRefCount();
|
||||
}
|
||||
// end while !terminated
|
||||
// end while !stopping
|
||||
|
||||
// Purge any unused inputs
|
||||
while (!iqInputQueue->empty()) {
|
||||
@ -301,7 +301,7 @@ void DemodulatorThread::run() {
|
||||
}
|
||||
|
||||
void DemodulatorThread::terminate() {
|
||||
terminated = true;
|
||||
IOThread::terminate();
|
||||
DemodulatorThreadPostIQData *inp = new DemodulatorThreadPostIQData; // push dummy to nudge queue
|
||||
iqInputQueue->push(inp);
|
||||
}
|
||||
|
@ -17,7 +17,7 @@ void DemodulatorWorkerThread::run() {
|
||||
commandQueue = static_cast<DemodulatorThreadWorkerCommandQueue *>(getInputQueue("WorkerCommandQueue"));
|
||||
resultQueue = static_cast<DemodulatorThreadWorkerResultQueue *>(getOutputQueue("WorkerResultQueue"));
|
||||
|
||||
while (!terminated) {
|
||||
while (!stopping) {
|
||||
bool filterChanged = false;
|
||||
bool makeDemod = false;
|
||||
DemodulatorWorkerThreadCommand filterCommand, demodCommand;
|
||||
@ -41,7 +41,7 @@ void DemodulatorWorkerThread::run() {
|
||||
done = commandQueue->empty();
|
||||
}
|
||||
|
||||
if ((makeDemod || filterChanged) && !terminated) {
|
||||
if ((makeDemod || filterChanged) && !stopping) {
|
||||
DemodulatorWorkerThreadResult result(DemodulatorWorkerThreadResult::DEMOD_WORKER_THREAD_RESULT_FILTERS);
|
||||
|
||||
|
||||
@ -103,7 +103,7 @@ void DemodulatorWorkerThread::run() {
|
||||
}
|
||||
|
||||
void DemodulatorWorkerThread::terminate() {
|
||||
terminated = true;
|
||||
IOThread::terminate();
|
||||
DemodulatorWorkerThreadCommand inp; // push dummy to nudge queue
|
||||
commandQueue->push(inp);
|
||||
}
|
||||
|
@ -75,7 +75,7 @@ public:
|
||||
DemodulatorWorkerThread();
|
||||
~DemodulatorWorkerThread();
|
||||
|
||||
void run();
|
||||
virtual void run();
|
||||
|
||||
void setCommandQueue(DemodulatorThreadWorkerCommandQueue *tQueue) {
|
||||
commandQueue = tQueue;
|
||||
@ -85,7 +85,7 @@ public:
|
||||
resultQueue = tQueue;
|
||||
}
|
||||
|
||||
void terminate();
|
||||
virtual void terminate();
|
||||
|
||||
protected:
|
||||
|
||||
|
@ -37,7 +37,7 @@ void FFTVisualDataThread::run() {
|
||||
|
||||
// std::cout << "FFT visual data thread started." << std::endl;
|
||||
|
||||
while(!terminated) {
|
||||
while(!stopping) {
|
||||
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(10));
|
||||
// std::this_thread::yield();
|
||||
|
@ -13,7 +13,7 @@ public:
|
||||
int getLinesPerSecond();
|
||||
SpectrumVisualProcessor *getProcessor();
|
||||
|
||||
void run();
|
||||
virtual void run();
|
||||
|
||||
protected:
|
||||
FFTDataDistributor fftDistrib;
|
||||
|
@ -15,7 +15,7 @@ SpectrumVisualProcessor *SpectrumVisualDataThread::getProcessor() {
|
||||
void SpectrumVisualDataThread::run() {
|
||||
// std::cout << "Spectrum visual data thread started." << std::endl;
|
||||
|
||||
while(!terminated) {
|
||||
while(!stopping) {
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(10));
|
||||
// std::this_thread::yield();
|
||||
sproc.run();
|
||||
|
@ -9,7 +9,7 @@ public:
|
||||
~SpectrumVisualDataThread();
|
||||
SpectrumVisualProcessor *getProcessor();
|
||||
|
||||
void run();
|
||||
virtual void run();
|
||||
|
||||
protected:
|
||||
SpectrumVisualProcessor sproc;
|
||||
|
@ -22,7 +22,7 @@ public:
|
||||
~RigThread();
|
||||
|
||||
void initRig(rig_model_t rig_model, std::string rig_file, int serial_rate);
|
||||
void run();
|
||||
virtual void run();
|
||||
|
||||
int terminationStatus();
|
||||
|
||||
|
@ -309,7 +309,7 @@ std::vector<SDRDeviceInfo *> *SDREnumerator::enumerate_devices(std::string remot
|
||||
void SDREnumerator::run() {
|
||||
|
||||
std::cout << "SDR enumerator starting." << std::endl;
|
||||
terminated.store(false);
|
||||
|
||||
|
||||
wxGetApp().sdrEnumThreadNotify(SDREnumerator::SDR_ENUM_MESSAGE, "Scanning local devices, please wait..");
|
||||
SDREnumerator::enumerate_devices("");
|
||||
@ -323,7 +323,6 @@ void SDREnumerator::run() {
|
||||
}
|
||||
|
||||
std::cout << "Reporting enumeration complete." << std::endl;
|
||||
terminated.store(true);
|
||||
wxGetApp().sdrEnumThreadNotify(SDREnumerator::SDR_ENUM_DEVICES_READY, "Finished scanning devices.");
|
||||
std::cout << "SDR enumerator done." << std::endl;
|
||||
|
||||
|
@ -23,7 +23,7 @@ public:
|
||||
|
||||
static std::vector<SDRDeviceInfo *> *enumerate_devices(std::string remoteAddr = "", bool noInit=false);
|
||||
|
||||
void run();
|
||||
virtual void run();
|
||||
|
||||
static SoapySDR::Kwargs argsStrToKwargs(const std::string &args);
|
||||
static void addRemote(std::string remoteAddr);
|
||||
|
@ -184,7 +184,7 @@ void SDRPostThread::run() {
|
||||
|
||||
iqDataInQueue->set_max_num_items(0);
|
||||
|
||||
while (!terminated) {
|
||||
while (!stopping) {
|
||||
SDRThreadIQData *data_in;
|
||||
|
||||
iqDataInQueue->pop(data_in);
|
||||
@ -229,7 +229,7 @@ void SDRPostThread::run() {
|
||||
}
|
||||
|
||||
void SDRPostThread::terminate() {
|
||||
terminated = true;
|
||||
IOThread::terminate();
|
||||
SDRThreadIQData *dummy = new SDRThreadIQData;
|
||||
iqDataInQueue->push(dummy);
|
||||
}
|
||||
|
@ -16,8 +16,8 @@ public:
|
||||
void bindDemodulators(std::vector<DemodulatorInstance *> *demods);
|
||||
void removeDemodulator(DemodulatorInstance *demod);
|
||||
|
||||
void run();
|
||||
void terminate();
|
||||
virtual void run();
|
||||
virtual void terminate();
|
||||
|
||||
void runSingleCH(SDRThreadIQData *data_in);
|
||||
void runPFBCH(SDRThreadIQData *data_in);
|
||||
|
@ -176,7 +176,7 @@ void SDRThread::readStream(SDRThreadIQDataQueue* iqDataOutQueue) {
|
||||
}
|
||||
}
|
||||
|
||||
while (n_read < nElems && !terminated) {
|
||||
while (n_read < nElems && !stopping) {
|
||||
int n_requested = nElems-n_read;
|
||||
int n_stream_read = device->readStream(stream, buffs, mtElems, flags, timeNs);
|
||||
if ((n_read + n_stream_read) > nElems) {
|
||||
@ -194,7 +194,7 @@ void SDRThread::readStream(SDRThreadIQDataQueue* iqDataOutQueue) {
|
||||
}
|
||||
}
|
||||
|
||||
if (n_read > 0 && !terminated) {
|
||||
if (n_read > 0 && !stopping) {
|
||||
SDRThreadIQData *dataOut = buffers.getBuffer();
|
||||
|
||||
if (iq_swap.load()) {
|
||||
@ -225,7 +225,7 @@ void SDRThread::readLoop() {
|
||||
|
||||
updateGains();
|
||||
|
||||
while (!terminated.load()) {
|
||||
while (!stopping.load()) {
|
||||
updateSettings();
|
||||
readStream(iqDataOutQueue);
|
||||
}
|
||||
@ -360,7 +360,6 @@ void SDRThread::run() {
|
||||
//#endif
|
||||
|
||||
std::cout << "SDR thread starting." << std::endl;
|
||||
terminated.store(false);
|
||||
|
||||
SDRDeviceInfo *activeDev = deviceInfo.load();
|
||||
|
||||
@ -380,8 +379,8 @@ void SDRThread::run() {
|
||||
|
||||
std::cout << "SDR thread done." << std::endl;
|
||||
|
||||
if (!terminated.load()) {
|
||||
terminated.store(true);
|
||||
if (!stopping.load()) {
|
||||
stopping.store(true);
|
||||
wxGetApp().sdrThreadNotify(SDRThread::SDR_THREAD_TERMINATED, "Done.");
|
||||
}
|
||||
}
|
||||
|
@ -50,7 +50,7 @@ public:
|
||||
~SDRThread();
|
||||
enum SDRThreadState { SDR_THREAD_MESSAGE, SDR_THREAD_INITIALIZED, SDR_THREAD_TERMINATED, SDR_THREAD_FAILED };
|
||||
|
||||
void run();
|
||||
virtual void run();
|
||||
|
||||
SDRDeviceInfo *getDevice();
|
||||
void setDevice(SDRDeviceInfo *dev);
|
||||
|
Loading…
Reference in New Issue
Block a user