diff --git a/src/IOThread.cpp b/src/IOThread.cpp index d57f5ba..b3ffb0a 100644 --- a/src/IOThread.cpp +++ b/src/IOThread.cpp @@ -1,5 +1,8 @@ #include "IOThread.h" +std::mutex ReBufferGC::g_mutex; +std::set ReBufferGC::garbage; + IOThread::IOThread() { terminated.store(false); } diff --git a/src/IOThread.h b/src/IOThread.h index ee93f6f..0197894 100644 --- a/src/IOThread.h +++ b/src/IOThread.h @@ -4,6 +4,7 @@ #include #include #include +#include #include #include @@ -23,6 +24,16 @@ class ReferenceCounter { public: +// void setIndex(int idx) { +// std::lock_guard < std::recursive_mutex > lock(m_mutex); +// index = idx; +// } + +// int getIndex() { +// std::lock_guard < std::recursive_mutex > lock(m_mutex); +// return index; +// } + void setRefCount(int rc) { std::lock_guard < std::recursive_mutex > lock(m_mutex); refCount = rc; @@ -49,17 +60,55 @@ protected: private: int refCount; +// int index; }; #define REBUFFER_GC_LIMIT 100 +class ReBufferGC { +public: + static void garbageCollect() { + std::lock_guard < std::mutex > lock(g_mutex); + + std::deque garbageRemoval; + for (typename std::set::iterator i = garbage.begin(); i != garbage.end(); i++) { + if ((*i)->getRefCount() <= 0) { + garbageRemoval.push_back(*i); + } + else { +// std::cout << "Garbage in queue buffer idx #" << (*i)->getIndex() << ", " << (*i)->getRefCount() << " usage(s)" << std::endl; + std::cout << "Garbage in queue buffer with " << (*i)->getRefCount() << " usage(s)" << std::endl; + } + } + if ( garbageRemoval.size() ) { + std::cout << "Garbage collecting " << garbageRemoval.size() << " ReBuffer(s)" << std::endl; + while (!garbageRemoval.empty()) { + ReferenceCounter *ref = garbageRemoval.back(); + garbageRemoval.pop_back(); + garbage.erase(ref); + delete ref; + } + } + } + + static void addGarbage(ReferenceCounter *ref) { + std::lock_guard < std::mutex > lock(g_mutex); + garbage.insert(ref); + } + +private: + static std::mutex g_mutex; + static std::set garbage; +}; + + template class ReBuffer { public: ReBuffer(std::string bufferId) : bufferId(bufferId) { - +// indexCounter.store(0); } BufferType *getBuffer() { @@ -69,7 +118,7 @@ public: for (outputBuffersI = outputBuffers.begin(); outputBuffersI != outputBuffers.end(); outputBuffersI++) { if (buf == nullptr && (*outputBuffersI)->getRefCount() <= 0) { buf = (*outputBuffersI); - buf->setRefCount(0); + buf->setRefCount(1); } else if ((*outputBuffersI)->getRefCount() <= 0) { (*outputBuffersI)->decRefCount(); } @@ -81,6 +130,7 @@ public: outputBuffers.pop_back(); delete ref; } +// buf->setIndex(indexCounter++); return buf; } @@ -90,6 +140,8 @@ public: } buf = new BufferType(); + buf->setRefCount(1); +// buf->setIndex(indexCounter++); outputBuffers.push_back(buf); return buf; @@ -97,17 +149,28 @@ public: void purge() { std::lock_guard < std::mutex > lock(m_mutex); +// if (bufferId == "DemodulatorThreadBuffers") { +// std::cout << "'" << bufferId << "' purging.. total indexes: " << indexCounter.load() << std::endl; +// } while (!outputBuffers.empty()) { BufferType *ref = outputBuffers.front(); outputBuffers.pop_front(); - delete ref; + if (ref->getRefCount() <= 0) { + delete ref; + } else { + // Something isn't done with it yet; throw it on the pile.. keep this as a bug indicator for now.. + std::cout << "'" << bufferId << "' pushed garbage.." << std::endl; + ReBufferGC::addGarbage(ref); + } } } -private: + + private: std::string bufferId; std::deque outputBuffers; typename std::deque::iterator outputBuffersI; mutable std::mutex m_mutex; +// std::atomic_int indexCounter; }; diff --git a/src/audio/AudioThread.cpp b/src/audio/AudioThread.cpp index 62cec69..0e400d0 100644 --- a/src/audio/AudioThread.cpp +++ b/src/audio/AudioThread.cpp @@ -89,8 +89,6 @@ static int audioCallback(void *outputBuffer, void * /* inputBuffer */, unsigned continue; } -// std::lock_guard < std::mutex > lock(srcmix->currentInput->m_mutex); - if (srcmix->currentInput->sampleRate != src->getSampleRate()) { while (srcmix->inputQueue->size()) { srcmix->inputQueue->pop(srcmix->currentInput); @@ -393,6 +391,20 @@ void AudioThread::run() { setSampleRate(command.int_value); } } + + // Drain any remaining inputs + if (inputQueue) while (!inputQueue->empty()) { + AudioThreadInput *ref; + inputQueue->pop(ref); + if (ref) { + ref->decRefCount(); + } + } + + if (currentInput) { + currentInput->setRefCount(0); + currentInput = nullptr; + } if (deviceController[parameters.deviceId] != this) { deviceController[parameters.deviceId]->removeThread(this); @@ -408,7 +420,7 @@ void AudioThread::run() { e.printMessage(); } } - + if (threadQueueNotify != NULL) { DemodulatorThreadCommand tCmd(DemodulatorThreadCommand::DEMOD_THREAD_CMD_AUDIO_TERMINATED); tCmd.context = this; @@ -431,23 +443,19 @@ void AudioThread::setActive(bool state) { AudioThreadInput *dummy; if (state && !active && inputQueue) { + deviceController[parameters.deviceId]->bindThread(this); + } else if (!state && active) { + deviceController[parameters.deviceId]->removeThread(this); + } + + // Activity state changing, clear any inputs + if(inputQueue) { while (!inputQueue->empty()) { // flush queue inputQueue->pop(dummy); if (dummy) { dummy->decRefCount(); } } - deviceController[parameters.deviceId]->bindThread(this); - } else if (!state && active) { - deviceController[parameters.deviceId]->removeThread(this); - if(inputQueue) { - while (!inputQueue->empty()) { // flush queue - inputQueue->pop(dummy); - if (dummy) { - dummy->decRefCount(); - } - } - } } active = state; } diff --git a/src/demod/DemodulatorMgr.cpp b/src/demod/DemodulatorMgr.cpp index 95970be..64b9d82 100644 --- a/src/demod/DemodulatorMgr.cpp +++ b/src/demod/DemodulatorMgr.cpp @@ -212,6 +212,7 @@ void DemodulatorMgr::setActiveDemodulator(DemodulatorInstance *demod, bool tempo #endif } else { garbageCollect(); + ReBufferGC::garbageCollect(); } if (activeVisualDemodulator) { diff --git a/src/demod/DemodulatorPreThread.cpp b/src/demod/DemodulatorPreThread.cpp index 31f9dfc..e1a438c 100644 --- a/src/demod/DemodulatorPreThread.cpp +++ b/src/demod/DemodulatorPreThread.cpp @@ -271,6 +271,11 @@ void DemodulatorPreThread::run() { } } + while (!iqOutputQueue->empty()) { + DemodulatorThreadPostIQData *tmp; + iqOutputQueue->pop(tmp); + tmp->decRefCount(); + } buffers.purge(); DemodulatorThreadCommand tCmd(DemodulatorThreadCommand::DEMOD_THREAD_CMD_DEMOD_PREPROCESS_TERMINATED); diff --git a/src/demod/DemodulatorThread.cpp b/src/demod/DemodulatorThread.cpp index 254aa68..9ec0cab 100644 --- a/src/demod/DemodulatorThread.cpp +++ b/src/demod/DemodulatorThread.cpp @@ -132,13 +132,11 @@ void DemodulatorThread::run() { ati->sampleRate = cModemKit->audioSampleRate; ati->inputRate = inp->sampleRate; - ati->setRefCount(1); } else if (modemDigital != nullptr) { ati = outputBuffers.getBuffer(); ati->sampleRate = cModemKit->sampleRate; ati->inputRate = inp->sampleRate; - ati->setRefCount(1); } cModem->demodulate(cModemKit, &modemData, ati); @@ -173,7 +171,7 @@ void DemodulatorThread::run() { } } } else if (ati) { - ati->decRefCount(); + ati->setRefCount(0); ati = nullptr; } @@ -277,15 +275,25 @@ void DemodulatorThread::run() { } // end while !terminated + // Purge any unused inputs + while (!iqInputQueue->empty()) { + DemodulatorThreadPostIQData *ref; + iqInputQueue->pop(ref); + if (ref) { // May have other consumers; just decrement + ref->decRefCount(); + } + } + while (!audioOutputQueue->empty()) { + AudioThreadInput *ref; + audioOutputQueue->pop(ref); + if (ref) { // Originated here; set RefCount to 0 + ref->setRefCount(0); + } + } outputBuffers.purge(); //Guard the cleanup of audioVisOutputQueue properly. std::lock_guard < std::mutex > lock(m_mutexAudioVisOutputQueue); - -// if (audioVisOutputQueue != nullptr && !audioVisOutputQueue->empty()) { -// AudioThreadInput *dummy_vis; -// audioVisOutputQueue->pop(dummy_vis); -// } DemodulatorThreadCommand tCmd(DemodulatorThreadCommand::DEMOD_THREAD_CMD_DEMOD_TERMINATED); tCmd.context = this; diff --git a/src/util/ThreadQueue.h b/src/util/ThreadQueue.h index e69265c..4d1f901 100644 --- a/src/util/ThreadQueue.h +++ b/src/util/ThreadQueue.h @@ -231,8 +231,9 @@ public: /** * Remove any items in the queue. */ - void flush() const { + void flush() { std::lock_guard < std::mutex > lock(m_mutex); + m_queue = std::queue(); std::queue emptyQueue; std::swap(m_queue, emptyQueue); }