Another fix for the neverending hung application bugs:

- Clear up DemodulatorInstance garbage collection, only do it in a single thread (AppFrame::OnIdle)
- Add a specific mutex lock for the list of deleted demodulators,
- Add a specific lock to control DemodulatorInstance thread lifetimes, protecting IsTerminated() of concurrent join and delete
This commit is contained in:
vsonnier 2017-05-24 18:55:37 +02:00
parent 4b323e9863
commit 77a82e1617
8 changed files with 33 additions and 21 deletions

View File

@ -1612,6 +1612,9 @@ void AppFrame::OnIdle(wxIdleEvent& event) {
updateDeviceParams(); updateDeviceParams();
} }
//try to garbage collect the retired demodulators.
wxGetApp().getDemodMgr().garbageCollect();
DemodulatorInstance *demod = wxGetApp().getDemodMgr().getLastActiveDemodulator(); DemodulatorInstance *demod = wxGetApp().getDemodMgr().getLastActiveDemodulator();
if (demod && demod->isModemInitialized()) { if (demod && demod->isModemInitialized()) {

View File

@ -78,6 +78,7 @@ DemodulatorInstance::DemodulatorInstance() {
} }
DemodulatorInstance::~DemodulatorInstance() { DemodulatorInstance::~DemodulatorInstance() {
std::lock_guard < std::mutex > lockData(m_thread_control_mutex);
#if ENABLE_DIGITAL_LAB #if ENABLE_DIGITAL_LAB
delete activeOutput; delete activeOutput;
#endif #endif
@ -89,7 +90,7 @@ DemodulatorInstance::~DemodulatorInstance() {
delete threadQueueControl; delete threadQueueControl;
delete pipeAudioData; delete pipeAudioData;
wxGetApp().getBookmarkMgr().updateActiveList(); // wxGetApp().getBookmarkMgr().updateActiveList();
} }
void DemodulatorInstance::setVisualOutputQueue(DemodulatorThreadOutputQueue *tQueue) { void DemodulatorInstance::setVisualOutputQueue(DemodulatorThreadOutputQueue *tQueue) {
@ -97,6 +98,9 @@ void DemodulatorInstance::setVisualOutputQueue(DemodulatorThreadOutputQueue *tQu
} }
void DemodulatorInstance::run() { void DemodulatorInstance::run() {
std::lock_guard < std::mutex > lockData(m_thread_control_mutex);
if (active) { if (active) {
return; return;
} }
@ -128,7 +132,7 @@ void DemodulatorInstance::run() {
active = true; active = true;
wxGetApp().getBookmarkMgr().updateActiveList(); // wxGetApp().getBookmarkMgr().updateActiveList();
} }
void DemodulatorInstance::updateLabel(long long freq) { void DemodulatorInstance::updateLabel(long long freq) {
@ -166,7 +170,8 @@ void DemodulatorInstance::setLabel(std::string labelStr) {
bool DemodulatorInstance::isTerminated() { bool DemodulatorInstance::isTerminated() {
// std::lock_guard < std::mutex > lockData(m_thread_control_mutex);
bool audioTerminated = audioThread->isTerminated(); bool audioTerminated = audioThread->isTerminated();
bool demodTerminated = demodulatorThread->isTerminated(); bool demodTerminated = demodulatorThread->isTerminated();
bool preDemodTerminated = demodulatorPreThread->isTerminated(); bool preDemodTerminated = demodulatorPreThread->isTerminated();

View File

@ -130,7 +130,7 @@ public:
void closeOutput(); void closeOutput();
#endif #endif
protected: private:
DemodulatorThreadInputQueue* pipeIQInputData; DemodulatorThreadInputQueue* pipeIQInputData;
DemodulatorThreadPostInputQueue* pipeIQDemodData; DemodulatorThreadPostInputQueue* pipeIQDemodData;
AudioThreadInputQueue *pipeAudioData; AudioThreadInputQueue *pipeAudioData;
@ -138,7 +138,8 @@ protected:
DemodulatorThread *demodulatorThread; DemodulatorThread *demodulatorThread;
DemodulatorThreadControlCommandQueue *threadQueueControl; DemodulatorThreadControlCommandQueue *threadQueueControl;
private: //protects child thread creation and termination
mutable std::mutex m_thread_control_mutex;
std::atomic<std::string *> label; // std::atomic<std::string *> label; //
// User editable buffer, 16 bit string. // User editable buffer, 16 bit string.

View File

@ -163,6 +163,7 @@ void DemodulatorMgr::deleteThread(DemodulatorInstance *demod) {
demod->terminate(); demod->terminate();
//Do not cleanup immediatly //Do not cleanup immediatly
std::lock_guard < std::mutex > lock_deleted(deleted_demods_busy);
demods_deleted.push_back(demod); demods_deleted.push_back(demod);
} }
@ -225,10 +226,7 @@ void DemodulatorMgr::setActiveDemodulator(DemodulatorInstance *demod, bool tempo
} }
#endif #endif
wxGetApp().getBookmarkMgr().updateActiveList(); wxGetApp().getBookmarkMgr().updateActiveList();
} else { }
std::lock_guard < std::recursive_mutex > lock(demods_busy);
garbageCollect();
}
if (activeVisualDemodulator.load()) { if (activeVisualDemodulator.load()) {
activeVisualDemodulator.load()->setVisualOutputQueue(nullptr); activeVisualDemodulator.load()->setVisualOutputQueue(nullptr);
@ -280,8 +278,9 @@ DemodulatorInstance *DemodulatorMgr::getLastDemodulatorWith(const std::string& t
return nullptr; return nullptr;
} }
//Private internal method, no need to protect it with demods_busy
void DemodulatorMgr::garbageCollect() { void DemodulatorMgr::garbageCollect() {
std::lock_guard < std::mutex > lock(deleted_demods_busy);
std::vector<DemodulatorInstance *>::iterator it = demods_deleted.begin(); std::vector<DemodulatorInstance *>::iterator it = demods_deleted.begin();
@ -290,11 +289,13 @@ void DemodulatorMgr::garbageCollect() {
if ((*it)->isTerminated()) { if ((*it)->isTerminated()) {
DemodulatorInstance *deleted = (*it); DemodulatorInstance *deleted = (*it);
std::cout << "Garbage collected demodulator instance '" << deleted->getLabel() << "'... " << std::endl << std::flush;
demods_deleted.erase(it);
delete deleted; delete deleted;
it = demods_deleted.erase(it); //only garbage collect 1 demod at a time.
return;
std::cout << "Garbage collected demodulator instance '" << deleted->getLabel() << "'... " << std::endl << std::flush;
} }
else { else {
it++; it++;
@ -431,7 +432,6 @@ void DemodulatorMgr::saveInstance(DataNode *node, DemodulatorInstance *inst) {
*settingsNode->newChild(msi->first.c_str()) = msi->second; *settingsNode->newChild(msi->first.c_str()) = msi->second;
} }
} }
} }
DemodulatorInstance *DemodulatorMgr::loadInstance(DataNode *node) { DemodulatorInstance *DemodulatorMgr::loadInstance(DataNode *node) {

View File

@ -67,10 +67,11 @@ public:
void saveInstance(DataNode *node, DemodulatorInstance *inst); void saveInstance(DataNode *node, DemodulatorInstance *inst);
DemodulatorInstance *loadInstance(DataNode *node); DemodulatorInstance *loadInstance(DataNode *node);
//to be called periodically to cleanup removed demodulators.
void garbageCollect();
private: private:
void garbageCollect();
std::vector<DemodulatorInstance *> demods; std::vector<DemodulatorInstance *> demods;
std::vector<DemodulatorInstance *> demods_deleted; std::vector<DemodulatorInstance *> demods_deleted;
@ -91,6 +92,8 @@ private:
//protects access to demods lists and such, need to be recursive //protects access to demods lists and such, need to be recursive
//because of the usage of public re-entrant methods //because of the usage of public re-entrant methods
std::recursive_mutex demods_busy; std::recursive_mutex demods_busy;
mutable std::mutex deleted_demods_busy;
std::map<std::string, ModemSettings> lastModemSettings; std::map<std::string, ModemSettings> lastModemSettings;
std::map<int,RtAudio::DeviceInfo> outputDevices; std::map<int,RtAudio::DeviceInfo> outputDevices;

View File

@ -170,7 +170,7 @@ protected:
if (inp) { if (inp) {
OutputDataTypePtr outp = buffers.getBuffer(); OutputDataTypePtr outp = buffers.getBuffer();
//'deep copy of the contents //'deep copy' of the contents
(*outp) = (*inp); (*outp) = (*inp);
VisualProcessor<OutputDataType, OutputDataType>::distribute(outp); VisualProcessor<OutputDataType, OutputDataType>::distribute(outp);

View File

@ -221,8 +221,8 @@ void SDRPostThread::run() {
iqVisualQueue->flush(); iqVisualQueue->flush();
} }
buffers.purge(); // buffers.purge();
visualDataBuffers.purge(); // visualDataBuffers.purge();
// std::cout << "SDR post-processing thread done." << std::endl; // std::cout << "SDR post-processing thread done." << std::endl;
} }

View File

@ -36,7 +36,7 @@ public:
/*! Create safe blocking queue. */ /*! Create safe blocking queue. */
ThreadBlockingQueue() { ThreadBlockingQueue() {
//at least 1 (== Exchanger) //at least 1 (== Java SynchronizedQueue)
m_max_num_items = MIN_ITEM_NB; m_max_num_items = MIN_ITEM_NB;
}; };
@ -258,7 +258,7 @@ public:
} }
private: private:
//TODO: use a circular buffer structure ? (fixed array + modulo)
std::deque<T> m_queue; std::deque<T> m_queue;
mutable std::mutex m_mutex; mutable std::mutex m_mutex;