diff --git a/src/AppFrame.cpp b/src/AppFrame.cpp index ca5a8c1..6abd434 100644 --- a/src/AppFrame.cpp +++ b/src/AppFrame.cpp @@ -1612,6 +1612,9 @@ void AppFrame::OnIdle(wxIdleEvent& event) { updateDeviceParams(); } + //try to garbage collect the retired demodulators. + wxGetApp().getDemodMgr().garbageCollect(); + DemodulatorInstance *demod = wxGetApp().getDemodMgr().getLastActiveDemodulator(); if (demod && demod->isModemInitialized()) { diff --git a/src/demod/DemodulatorInstance.cpp b/src/demod/DemodulatorInstance.cpp index 355daa6..9605707 100644 --- a/src/demod/DemodulatorInstance.cpp +++ b/src/demod/DemodulatorInstance.cpp @@ -78,6 +78,7 @@ DemodulatorInstance::DemodulatorInstance() { } DemodulatorInstance::~DemodulatorInstance() { + std::lock_guard < std::mutex > lockData(m_thread_control_mutex); #if ENABLE_DIGITAL_LAB delete activeOutput; #endif @@ -89,7 +90,7 @@ DemodulatorInstance::~DemodulatorInstance() { delete threadQueueControl; delete pipeAudioData; - wxGetApp().getBookmarkMgr().updateActiveList(); + // wxGetApp().getBookmarkMgr().updateActiveList(); } void DemodulatorInstance::setVisualOutputQueue(DemodulatorThreadOutputQueue *tQueue) { @@ -97,6 +98,9 @@ void DemodulatorInstance::setVisualOutputQueue(DemodulatorThreadOutputQueue *tQu } void DemodulatorInstance::run() { + + std::lock_guard < std::mutex > lockData(m_thread_control_mutex); + if (active) { return; } @@ -128,7 +132,7 @@ void DemodulatorInstance::run() { active = true; - wxGetApp().getBookmarkMgr().updateActiveList(); + // wxGetApp().getBookmarkMgr().updateActiveList(); } void DemodulatorInstance::updateLabel(long long freq) { @@ -166,7 +170,8 @@ void DemodulatorInstance::setLabel(std::string labelStr) { bool DemodulatorInstance::isTerminated() { - // + std::lock_guard < std::mutex > lockData(m_thread_control_mutex); + bool audioTerminated = audioThread->isTerminated(); bool demodTerminated = demodulatorThread->isTerminated(); bool preDemodTerminated = demodulatorPreThread->isTerminated(); diff --git a/src/demod/DemodulatorInstance.h b/src/demod/DemodulatorInstance.h index 749c6ec..a5be9d1 100644 --- a/src/demod/DemodulatorInstance.h +++ b/src/demod/DemodulatorInstance.h @@ -130,7 +130,7 @@ public: void closeOutput(); #endif -protected: +private: DemodulatorThreadInputQueue* pipeIQInputData; DemodulatorThreadPostInputQueue* pipeIQDemodData; AudioThreadInputQueue *pipeAudioData; @@ -138,7 +138,8 @@ protected: DemodulatorThread *demodulatorThread; DemodulatorThreadControlCommandQueue *threadQueueControl; -private: + //protects child thread creation and termination + mutable std::mutex m_thread_control_mutex; std::atomic label; // // User editable buffer, 16 bit string. diff --git a/src/demod/DemodulatorMgr.cpp b/src/demod/DemodulatorMgr.cpp index c0a582d..761a6b6 100644 --- a/src/demod/DemodulatorMgr.cpp +++ b/src/demod/DemodulatorMgr.cpp @@ -163,6 +163,7 @@ void DemodulatorMgr::deleteThread(DemodulatorInstance *demod) { demod->terminate(); //Do not cleanup immediatly + std::lock_guard < std::mutex > lock_deleted(deleted_demods_busy); demods_deleted.push_back(demod); } @@ -225,10 +226,7 @@ void DemodulatorMgr::setActiveDemodulator(DemodulatorInstance *demod, bool tempo } #endif wxGetApp().getBookmarkMgr().updateActiveList(); - } else { - std::lock_guard < std::recursive_mutex > lock(demods_busy); - garbageCollect(); - } + } if (activeVisualDemodulator.load()) { activeVisualDemodulator.load()->setVisualOutputQueue(nullptr); @@ -280,8 +278,9 @@ DemodulatorInstance *DemodulatorMgr::getLastDemodulatorWith(const std::string& t return nullptr; } -//Private internal method, no need to protect it with demods_busy void DemodulatorMgr::garbageCollect() { + + std::lock_guard < std::mutex > lock(deleted_demods_busy); std::vector::iterator it = demods_deleted.begin(); @@ -290,11 +289,13 @@ void DemodulatorMgr::garbageCollect() { if ((*it)->isTerminated()) { DemodulatorInstance *deleted = (*it); + + std::cout << "Garbage collected demodulator instance '" << deleted->getLabel() << "'... " << std::endl << std::flush; + demods_deleted.erase(it); delete deleted; - it = demods_deleted.erase(it); - - std::cout << "Garbage collected demodulator instance '" << deleted->getLabel() << "'... " << std::endl << std::flush; + //only garbage collect 1 demod at a time. + return; } else { it++; @@ -431,7 +432,6 @@ void DemodulatorMgr::saveInstance(DataNode *node, DemodulatorInstance *inst) { *settingsNode->newChild(msi->first.c_str()) = msi->second; } } - } DemodulatorInstance *DemodulatorMgr::loadInstance(DataNode *node) { diff --git a/src/demod/DemodulatorMgr.h b/src/demod/DemodulatorMgr.h index ca65c22..e6887f1 100644 --- a/src/demod/DemodulatorMgr.h +++ b/src/demod/DemodulatorMgr.h @@ -67,10 +67,11 @@ public: void saveInstance(DataNode *node, DemodulatorInstance *inst); DemodulatorInstance *loadInstance(DataNode *node); + + //to be called periodically to cleanup removed demodulators. + void garbageCollect(); private: - - void garbageCollect(); std::vector demods; std::vector demods_deleted; @@ -91,6 +92,8 @@ private: //protects access to demods lists and such, need to be recursive //because of the usage of public re-entrant methods std::recursive_mutex demods_busy; + + mutable std::mutex deleted_demods_busy; std::map lastModemSettings; std::map outputDevices; diff --git a/src/process/VisualProcessor.h b/src/process/VisualProcessor.h index 7d8110c..c7f77be 100644 --- a/src/process/VisualProcessor.h +++ b/src/process/VisualProcessor.h @@ -170,7 +170,7 @@ protected: if (inp) { OutputDataTypePtr outp = buffers.getBuffer(); - //'deep copy of the contents + //'deep copy' of the contents (*outp) = (*inp); VisualProcessor::distribute(outp); diff --git a/src/sdr/SDRPostThread.cpp b/src/sdr/SDRPostThread.cpp index db82fd7..01298e2 100644 --- a/src/sdr/SDRPostThread.cpp +++ b/src/sdr/SDRPostThread.cpp @@ -221,8 +221,8 @@ void SDRPostThread::run() { iqVisualQueue->flush(); } - buffers.purge(); - visualDataBuffers.purge(); + // buffers.purge(); + // visualDataBuffers.purge(); // std::cout << "SDR post-processing thread done." << std::endl; } diff --git a/src/util/ThreadBlockingQueue.h b/src/util/ThreadBlockingQueue.h index 47819d0..d4b7c1e 100644 --- a/src/util/ThreadBlockingQueue.h +++ b/src/util/ThreadBlockingQueue.h @@ -36,7 +36,7 @@ public: /*! Create safe blocking queue. */ ThreadBlockingQueue() { - //at least 1 (== Exchanger) + //at least 1 (== Java SynchronizedQueue) m_max_num_items = MIN_ITEM_NB; }; @@ -258,7 +258,7 @@ public: } private: - //TODO: use a circular buffer structure ? (fixed array + modulo) + std::deque m_queue; mutable std::mutex m_mutex;