THREAD_CLEAN_PART2: Assure correct terminate()/isTerminated(delay)/delete sequence +

Removed the NotifyQueue between DemodulatorInstance participants, actually not needed
since DemodulatorInstance::IsTerminated() is explicitly called on some events for cleanups

DELETE_CLEANUP: properly nullify deleted pointers when needed
This commit is contained in:
vsonnier 2016-07-03 09:47:28 +02:00
parent cf90a829b0
commit 567d84711f
16 changed files with 163 additions and 154 deletions

View File

@ -293,43 +293,63 @@ int CubicSDR::OnExit() {
demodMgr.terminateAll();
std::cout << "Terminating SDR thread.." << std::endl;
if (!sdrThread->isTerminated()) {
sdrThread->terminate();
if (t_SDR) {
t_SDR->join();
}
sdrThread->terminate();
sdrThread->isTerminated(1000);
if (t_SDR) {
t_SDR->join();
}
std::cout << "Terminating SDR post-processing thread.." << std::endl;
sdrPostThread->terminate();
t_PostSDR->join();
std::cout << "Terminating Visual Processor threads.." << std::endl;
spectrumVisualThread->terminate();
t_SpectrumVisual->join();
demodVisualThread->terminate();
t_DemodVisual->join();
//Poor man join
//Wait nicely
sdrPostThread->isTerminated(1000);
spectrumVisualThread->isTerminated(1000);
demodVisualThread->isTerminated(1000);
//Then join the thread themselves
t_PostSDR->join();
t_DemodVisual->join();
t_SpectrumVisual->join();
//Now only we can delete
delete sdrThread;
sdrThread = nullptr;
delete sdrPostThread;
sdrPostThread = nullptr;
delete t_PostSDR;
t_PostSDR = nullptr;
delete t_SpectrumVisual;
t_SpectrumVisual = nullptr;
delete spectrumVisualThread;
spectrumVisualThread = nullptr;
delete t_DemodVisual;
t_DemodVisual = nullptr;
delete demodVisualThread;
demodVisualThread = nullptr;
delete pipeIQVisualData;
pipeIQVisualData = nullptr;
delete pipeAudioVisualData;
pipeAudioVisualData = nullptr;
delete pipeSDRIQData;
pipeSDRIQData = nullptr;
delete m_glContext;
m_glContext = nullptr;
#ifdef __APPLE__
AudioThread::deviceCleanup();
@ -427,6 +447,7 @@ void CubicSDR::sdrThreadNotify(SDRThread::SDRThreadState state, std::string mess
if (state == SDRThread::SDR_THREAD_TERMINATED) {
t_SDR->join();
delete t_SDR;
t_SDR = nullptr;
}
if (state == SDRThread::SDR_THREAD_FAILED) {
notifyMessage = message;
@ -532,14 +553,15 @@ void CubicSDR::stopDevice(bool store) {
}
sdrThread->setDevice(nullptr);
if (!sdrThread->isTerminated()) {
sdrThread->terminate();
if (t_SDR) {
t_SDR->join();
delete t_SDR;
t_SDR = nullptr;
}
sdrThread->terminate();
sdrThread->isTerminated(1000);
if (t_SDR) {
t_SDR->join();
delete t_SDR;
t_SDR = nullptr;
}
}
void CubicSDR::reEnumerateDevices() {
@ -550,12 +572,14 @@ void CubicSDR::reEnumerateDevices() {
}
void CubicSDR::setDevice(SDRDeviceInfo *dev) {
if (!sdrThread->isTerminated()) {
sdrThread->terminate();
if (t_SDR) {
t_SDR->join();
delete t_SDR;
}
sdrThread->terminate();
sdrThread->isTerminated(1000);
if (t_SDR) {
t_SDR->join();
delete t_SDR;
t_SDR = nullptr;
}
for (SoapySDR::Kwargs::const_iterator i = settingArgs.begin(); i != settingArgs.end(); i++) {
@ -872,17 +896,27 @@ RigThread *CubicSDR::getRigThread() {
void CubicSDR::initRig(int rigModel, std::string rigPort, int rigSerialRate) {
if (rigThread) {
if (!rigThread->isTerminated()) {
rigThread->terminate();
}
rigThread->terminate();
rigThread->isTerminated(1000);
}
if (t_Rig && t_Rig->joinable()) {
t_Rig->join();
}
//now we can delete
if (rigThread) {
delete rigThread;
rigThread = nullptr;
}
if (t_Rig && t_Rig->joinable()) {
t_Rig->join();
if (t_Rig) {
delete t_Rig;
t_Rig = nullptr;
}
rigThread = new RigThread();
rigThread->initRig(rigModel, rigPort, rigSerialRate);
rigThread->setControlMode(wxGetApp().getConfig()->getRigControlMode());
@ -899,14 +933,24 @@ void CubicSDR::stopRig() {
}
if (rigThread) {
if (!rigThread->isTerminated()) {
rigThread->terminate();
}
rigThread->terminate();
rigThread->isTerminated(1000);
}
if (t_Rig && t_Rig->joinable()) {
t_Rig->join();
}
//now we can delete
if (rigThread) {
delete rigThread;
rigThread = nullptr;
}
if (t_Rig && t_Rig->joinable()) {
t_Rig->join();
if (t_Rig) {
delete t_Rig;
t_Rig = nullptr;
}

View File

@ -180,10 +180,10 @@ private:
DemodulatorMgr demodMgr;
long long frequency;
long long offset;
int ppm, snap;
long long sampleRate;
std::atomic_llong frequency;
std::atomic_llong offset;
std::atomic_int ppm, snap;
std::atomic_llong sampleRate;
std::atomic_bool agcMode;
SDRThread *sdrThread;
@ -224,7 +224,7 @@ private:
std::atomic_bool soloMode;
SDRDeviceInfo *stoppedDev;
#ifdef USE_HAMLIB
RigThread *rigThread;
RigThread* rigThread;
std::thread *t_Rig;
#endif
};

View File

@ -1,4 +1,5 @@
#include "IOThread.h"
#include <typeinfo>
std::mutex ReBufferGC::g_mutex;
std::set<ReferenceCounter *> ReBufferGC::garbage;
@ -122,5 +123,7 @@ bool IOThread::isTerminated(int waitMs) {
}
}
std::cout << "ERROR: thread '" << typeid(*this).name() << "' has not terminated in time ! (> " << waitMs << " ms)" << std::endl;
return terminated.load();
}

View File

@ -12,7 +12,7 @@ std::map<int, int> AudioThread::deviceSampleRate;
std::map<int, std::thread *> AudioThread::deviceThread;
AudioThread::AudioThread() : IOThread(),
currentInput(NULL), inputQueue(NULL), nBufferFrames(1024), threadQueueNotify(NULL), sampleRate(0) {
currentInput(NULL), inputQueue(NULL), nBufferFrames(1024), sampleRate(0) {
audioQueuePtr.store(0);
underflowCount.store(0);
@ -378,7 +378,6 @@ void AudioThread::run() {
// std::cout << "Audio thread started." << std::endl;
inputQueue = static_cast<AudioThreadInputQueue *>(getInputQueue("AudioDataInput"));
threadQueueNotify = static_cast<DemodulatorThreadCommandQueue*>(getOutputQueue("NotifyQueue"));
while (!stopping) {
AudioThreadCommand command;
@ -420,12 +419,7 @@ void AudioThread::run() {
e.printMessage();
}
}
if (threadQueueNotify != NULL) {
DemodulatorThreadCommand tCmd(DemodulatorThreadCommand::DEMOD_THREAD_CMD_AUDIO_TERMINATED);
tCmd.context = this;
threadQueueNotify->push(tCmd);
}
// std::cout << "Audio thread done." << std::endl;
}

View File

@ -86,7 +86,6 @@ private:
RtAudio::StreamOptions opts;
RtAudio::StreamParameters parameters;
AudioThreadCommandQueue cmdQueue;
DemodulatorThreadCommandQueue* threadQueueNotify;
int sampleRate;
public:

View File

@ -10,29 +10,7 @@
#include "IOThread.h"
class DemodulatorThread;
class DemodulatorThreadCommand {
public:
enum DemodulatorThreadCommandEnum {
DEMOD_THREAD_CMD_NULL,
DEMOD_THREAD_CMD_DEMOD_PREPROCESS_TERMINATED,
DEMOD_THREAD_CMD_DEMOD_TERMINATED,
DEMOD_THREAD_CMD_AUDIO_TERMINATED
};
DemodulatorThreadCommand() :
cmd(DEMOD_THREAD_CMD_NULL), context(NULL), llong_value(0) {
}
DemodulatorThreadCommand(DemodulatorThreadCommandEnum cmd) :
cmd(cmd), context(NULL), llong_value(0) {
}
DemodulatorThreadCommandEnum cmd;
void *context;
long long llong_value;
};
class DemodulatorThreadControlCommand {
public:
@ -120,5 +98,4 @@ public:
typedef ThreadQueue<DemodulatorThreadIQData *> DemodulatorThreadInputQueue;
typedef ThreadQueue<DemodulatorThreadPostIQData *> DemodulatorThreadPostInputQueue;
typedef ThreadQueue<DemodulatorThreadCommand> DemodulatorThreadCommandQueue;
typedef ThreadQueue<DemodulatorThreadControlCommand> DemodulatorThreadControlCommandQueue;

View File

@ -51,14 +51,12 @@ DemodulatorInstance::DemodulatorInstance() {
pipeIQInputData = new DemodulatorThreadInputQueue;
pipeIQDemodData = new DemodulatorThreadPostInputQueue;
pipeDemodNotify = new DemodulatorThreadCommandQueue;
audioThread = new AudioThread();
demodulatorPreThread = new DemodulatorPreThread(this);
demodulatorPreThread->setInputQueue("IQDataInput",pipeIQInputData);
demodulatorPreThread->setOutputQueue("IQDataOutput",pipeIQDemodData);
demodulatorPreThread->setOutputQueue("NotifyQueue",pipeDemodNotify);
pipeAudioData = new AudioThreadInputQueue;
threadQueueControl = new DemodulatorThreadControlCommandQueue;
@ -66,11 +64,9 @@ DemodulatorInstance::DemodulatorInstance() {
demodulatorThread = new DemodulatorThread(this);
demodulatorThread->setInputQueue("IQDataInput",pipeIQDemodData);
demodulatorThread->setInputQueue("ControlQueue",threadQueueControl);
demodulatorThread->setOutputQueue("NotifyQueue",pipeDemodNotify);
demodulatorThread->setOutputQueue("AudioDataOutput", pipeAudioData);
audioThread->setInputQueue("AudioDataInput", pipeAudioData);
audioThread->setOutputQueue("NotifyQueue", pipeDemodNotify);
}
DemodulatorInstance::~DemodulatorInstance() {
@ -82,7 +78,6 @@ DemodulatorInstance::~DemodulatorInstance() {
delete demodulatorPreThread;
delete pipeIQInputData;
delete pipeIQDemodData;
delete pipeDemodNotify;
delete threadQueueControl;
delete pipeAudioData;
}
@ -151,56 +146,59 @@ void DemodulatorInstance::setLabel(std::string labelStr) {
}
bool DemodulatorInstance::isTerminated() {
while (!pipeDemodNotify->empty()) {
DemodulatorThreadCommand cmd;
pipeDemodNotify->pop(cmd);
switch (cmd.cmd) {
case DemodulatorThreadCommand::DEMOD_THREAD_CMD_AUDIO_TERMINATED:
if (t_Audio) {
t_Audio->join();
delete t_Audio;
t_Audio = nullptr;
}
break;
case DemodulatorThreadCommand::DEMOD_THREAD_CMD_DEMOD_TERMINATED:
if (t_Demod) {
#ifdef __APPLE__
pthread_join(t_Demod, nullptr);
#else
t_Demod->join();
delete t_Demod;
#endif
t_Demod = nullptr;
}
#if ENABLE_DIGITAL_LAB
if (activeOutput) {
closeOutput();
}
#endif
break;
case DemodulatorThreadCommand::DEMOD_THREAD_CMD_DEMOD_PREPROCESS_TERMINATED:
if (t_PreDemod) {
#ifdef __APPLE__
pthread_join(t_PreDemod, NULL);
#else
t_PreDemod->join();
delete t_PreDemod;
#endif
t_PreDemod = nullptr;
}
break;
default:
break;
}
}
//
bool audioTerminated = audioThread->isTerminated();
bool demodTerminated = demodulatorThread->isTerminated();
bool preDemodTerminated = demodulatorPreThread->isTerminated();
//Cleanup the worker threads, if the threads are indeed terminated
if (audioTerminated) {
if (t_Audio) {
t_Audio->join();
delete t_Audio;
t_Audio = nullptr;
}
}
if (demodTerminated) {
if (t_Demod) {
#ifdef __APPLE__
pthread_join(t_Demod, nullptr);
#else
t_Demod->join();
delete t_Demod;
#endif
t_Demod = nullptr;
}
#if ENABLE_DIGITAL_LAB
if (activeOutput) {
closeOutput();
}
#endif
}
if (preDemodTerminated) {
if (t_PreDemod) {
#ifdef __APPLE__
pthread_join(t_PreDemod, NULL);
#else
t_PreDemod->join();
delete t_PreDemod;
#endif
t_PreDemod = nullptr;
}
}
bool terminated = audioTerminated && demodTerminated && preDemodTerminated;
return terminated;

View File

@ -47,8 +47,6 @@ public:
void setVisualOutputQueue(DemodulatorThreadOutputQueue *tQueue);
DemodulatorThreadCommandQueue *getCommandQueue();
void run();
void terminate();
std::string getLabel();
@ -131,7 +129,6 @@ protected:
DemodulatorThreadInputQueue* pipeIQInputData;
DemodulatorThreadPostInputQueue* pipeIQDemodData;
AudioThreadInputQueue *pipeAudioData;
DemodulatorThreadCommandQueue* pipeDemodNotify;
DemodulatorPreThread *demodulatorPreThread;
DemodulatorThread *demodulatorThread;
DemodulatorThreadControlCommandQueue *threadQueueControl;

View File

@ -148,14 +148,18 @@ void DemodulatorMgr::deleteThread(DemodulatorInstance *demod) {
if (i != demods.end()) {
demods.erase(i);
}
//Ask for termination
demod->terminate();
//Do not cleanup immediatly
demods_deleted.push_back(demod);
}
std::vector<DemodulatorInstance *> *DemodulatorMgr::getDemodulatorsAt(long long freq, int bandwidth) {
std::vector<DemodulatorInstance *> DemodulatorMgr::getDemodulatorsAt(long long freq, int bandwidth) {
std::lock_guard < std::recursive_mutex > lock(demods_busy);
std::vector<DemodulatorInstance *> *foundDemods = new std::vector<DemodulatorInstance *>();
std::vector<DemodulatorInstance *> foundDemods;
for (int i = 0, iMax = demods.size(); i < iMax; i++) {
DemodulatorInstance *testDemod = demods[i];
@ -167,7 +171,7 @@ std::vector<DemodulatorInstance *> *DemodulatorMgr::getDemodulatorsAt(long long
long long halfBuffer = bandwidth / 2;
if ((freq <= (freqTest + ((testDemod->getDemodulatorType() != "LSB")?halfBandwidthTest:0) + halfBuffer)) && (freq >= (freqTest - ((testDemod->getDemodulatorType() != "USB")?halfBandwidthTest:0) - halfBuffer))) {
foundDemods->push_back(testDemod);
foundDemods.push_back(testDemod);
}
}

View File

@ -14,7 +14,7 @@ public:
DemodulatorInstance *newThread();
std::vector<DemodulatorInstance *> &getDemodulators();
std::vector<DemodulatorInstance *> getOrderedDemodulators(bool actives = true);
std::vector<DemodulatorInstance *> *getDemodulatorsAt(long long freq, int bandwidth);
std::vector<DemodulatorInstance *> getDemodulatorsAt(long long freq, int bandwidth);
DemodulatorInstance *getPreviousDemodulator(DemodulatorInstance *demod, bool actives = true);
DemodulatorInstance *getNextDemodulator(DemodulatorInstance *demod, bool actives = true);
DemodulatorInstance *getLastDemodulator();

View File

@ -9,7 +9,7 @@
#include "CubicSDR.h"
#include "DemodulatorInstance.h"
DemodulatorPreThread::DemodulatorPreThread(DemodulatorInstance *parent) : IOThread(), iqResampler(NULL), iqResampleRatio(1), cModem(nullptr), cModemKit(nullptr), iqInputQueue(NULL), iqOutputQueue(NULL), threadQueueNotify(NULL)
DemodulatorPreThread::DemodulatorPreThread(DemodulatorInstance *parent) : IOThread(), iqResampler(NULL), iqResampleRatio(1), cModem(nullptr), cModemKit(nullptr), iqInputQueue(NULL), iqOutputQueue(NULL)
{
initialized.store(false);
this->parent = parent;
@ -58,7 +58,6 @@ void DemodulatorPreThread::run() {
iqInputQueue = static_cast<DemodulatorThreadInputQueue*>(getInputQueue("IQDataInput"));
iqOutputQueue = static_cast<DemodulatorThreadPostInputQueue*>(getOutputQueue("IQDataOutput"));
threadQueueNotify = static_cast<DemodulatorThreadCommandQueue*>(getOutputQueue("NotifyQueue"));
std::vector<liquid_float_complex> in_buf_data;
std::vector<liquid_float_complex> out_buf_data;
@ -276,11 +275,6 @@ void DemodulatorPreThread::run() {
tmp->decRefCount();
}
buffers.purge();
DemodulatorThreadCommand tCmd(DemodulatorThreadCommand::DEMOD_THREAD_CMD_DEMOD_PREPROCESS_TERMINATED);
tCmd.context = this;
threadQueueNotify->push(tCmd);
// std::cout << "Demodulator preprocessor thread done." << std::endl;
}
void DemodulatorPreThread::setDemodType(std::string demodType) {
@ -346,12 +340,22 @@ void DemodulatorPreThread::terminate() {
iqInputQueue->push(inp);
DemodulatorWorkerThreadCommand command(DemodulatorWorkerThreadCommand::DEMOD_WORKER_THREAD_CMD_NULL);
workerQueue->push(command);
workerThread->terminate();
workerThread->isTerminated(1000);
t_Worker->join();
delete t_Worker;
t_Worker = nullptr;
delete workerThread;
workerThread = nullptr;
delete workerResults;
workerResults = nullptr;
delete workerQueue;
workerQueue = nullptr;
}
Modem *DemodulatorPreThread::getModem() {

View File

@ -79,5 +79,4 @@ protected:
DemodulatorThreadInputQueue* iqInputQueue;
DemodulatorThreadPostInputQueue* iqOutputQueue;
DemodulatorThreadCommandQueue* threadQueueNotify;
};

View File

@ -69,8 +69,7 @@ void DemodulatorThread::run() {
iqInputQueue = static_cast<DemodulatorThreadPostInputQueue*>(getInputQueue("IQDataInput"));
audioOutputQueue = static_cast<AudioThreadInputQueue*>(getOutputQueue("AudioDataOutput"));
threadQueueControl = static_cast<DemodulatorThreadControlCommandQueue *>(getInputQueue("ControlQueue"));
threadQueueNotify = static_cast<DemodulatorThreadCommandQueue*>(getOutputQueue("NotifyQueue"));
ModemIQData modemData;
while (!stopping) {
@ -290,13 +289,6 @@ void DemodulatorThread::run() {
}
outputBuffers.purge();
//Guard the cleanup of audioVisOutputQueue properly.
std::lock_guard < std::mutex > lock(m_mutexAudioVisOutputQueue);
DemodulatorThreadCommand tCmd(DemodulatorThreadCommand::DEMOD_THREAD_CMD_DEMOD_TERMINATED);
tCmd.context = this;
threadQueueNotify->push(tCmd);
// std::cout << "Demodulator thread done." << std::endl;
}

View File

@ -56,7 +56,6 @@ protected:
AudioThreadInputQueue *audioOutputQueue = nullptr;
DemodulatorThreadOutputQueue* audioVisOutputQueue = nullptr;
DemodulatorThreadControlCommandQueue *threadQueueControl = nullptr;
DemodulatorThreadCommandQueue* threadQueueNotify = nullptr;
//protects the audioVisOutputQueue dynamic binding change at runtime (in DemodulatorMgr)
mutable std::mutex m_mutexAudioVisOutputQueue;

View File

@ -400,7 +400,6 @@ void GLFont::loadFontOnce() {
}
//2) then load from memory
lodepng::State state;
unsigned error = lodepng::decode(image, imgWidth, imgHeight, raw_image, png_size);
delete[] raw_image;

View File

@ -492,7 +492,7 @@ void WaterfallCanvas::OnIdle(wxIdleEvent &event) {
void WaterfallCanvas::updateHoverState() {
long long freqPos = getFrequencyAt(mouseTracker.getMouseX());
std::vector<DemodulatorInstance *> *demodsHover = wxGetApp().getDemodMgr().getDemodulatorsAt(freqPos, 15000);
std::vector<DemodulatorInstance *> demodsHover = wxGetApp().getDemodMgr().getDemodulatorsAt(freqPos, 15000);
wxGetApp().getDemodMgr().setActiveDemodulator(NULL);
@ -505,13 +505,13 @@ void WaterfallCanvas::updateHoverState() {
} else {
setStatusText("Click and drag to set the current demodulator range.");
}
} else if (demodsHover->size() && !shiftDown) {
} else if (demodsHover.size() && !shiftDown) {
long near_dist = getBandwidth();
DemodulatorInstance *activeDemodulator = NULL;
for (int i = 0, iMax = demodsHover->size(); i < iMax; i++) {
DemodulatorInstance *demod = (*demodsHover)[i];
for (int i = 0, iMax = demodsHover.size(); i < iMax; i++) {
DemodulatorInstance *demod = demodsHover[i];
long long freqDiff = demod->getFrequency() - freqPos;
long halfBw = (demod->getBandwidth() / 2);
long long currentBw = getBandwidth();
@ -574,18 +574,18 @@ void WaterfallCanvas::updateHoverState() {
mouseTracker.setHorizDragLock(false);
setStatusText("Click and drag to change demodulator frequency; SPACE or numeric key for direct input. [, ] to nudge, M for mute, D to delete, C to center, E to edit label.");
}
} else {
}
else {
SetCursor(wxCURSOR_CROSS);
nextDragState = WF_DRAG_NONE;
if (shiftDown) {
setStatusText("Click to create a new demodulator or hold ALT to drag range, SPACE or numeric key for direct center frequency input.");
} else {
}
else {
setStatusText(
"Click to set active demodulator frequency or hold ALT to drag range; hold SHIFT to create new. Right drag or wheel to Zoom. Arrow keys to navigate/zoom, C to center.");
"Click to set active demodulator frequency or hold ALT to drag range; hold SHIFT to create new. Right drag or wheel to Zoom. Arrow keys to navigate/zoom, C to center.");
}
}
delete demodsHover;
}
void WaterfallCanvas::OnMouseMoved(wxMouseEvent& event) {