From 037be13fac56361ec45505e73d90f5d5416bcff8 Mon Sep 17 00:00:00 2001 From: "Charles J. Cliffe" Date: Tue, 7 Jun 2016 19:54:36 -0400 Subject: [PATCH] Rebuffer Garbage collector; mostly to pinpoint/gracefully handle ReBuffer failures --- src/IOThread.cpp | 3 ++ src/IOThread.h | 49 ++++++++++++++++++++++++++-- src/demod/DemodulatorMgr.cpp | 1 + src/modules/modem/analog/ModemAM.cpp | 2 ++ 4 files changed, 53 insertions(+), 2 deletions(-) diff --git a/src/IOThread.cpp b/src/IOThread.cpp index d57f5ba..b3ffb0a 100644 --- a/src/IOThread.cpp +++ b/src/IOThread.cpp @@ -1,5 +1,8 @@ #include "IOThread.h" +std::mutex ReBufferGC::g_mutex; +std::set ReBufferGC::garbage; + IOThread::IOThread() { terminated.store(false); } diff --git a/src/IOThread.h b/src/IOThread.h index ee93f6f..7ec7165 100644 --- a/src/IOThread.h +++ b/src/IOThread.h @@ -4,6 +4,7 @@ #include #include #include +#include #include #include @@ -54,6 +55,43 @@ private: #define REBUFFER_GC_LIMIT 100 +class ReBufferGC { +public: + static void garbageCollect() { + std::lock_guard < std::mutex > lock(g_mutex); + + std::deque garbageRemoval; + for (typename std::set::iterator i = garbage.begin(); i != garbage.end(); i++) { + if ((*i)->getRefCount() <= 0) { + garbageRemoval.push_back(*i); + } + else { + std::cout << "Garbage in queue " << (*i)->getRefCount() << " usage(s)" << std::endl; + } + } + if ( garbageRemoval.size() ) { + std::cout << "Garbage collecting " << garbageRemoval.size() << " ReBuffer(s)" << std::endl; + while (!garbageRemoval.empty()) { + ReferenceCounter *ref = garbageRemoval.back(); + garbageRemoval.pop_back(); + garbage.erase(ref); + delete ref; + } + } + } + + static void addGarbage(ReferenceCounter *ref) { + std::lock_guard < std::mutex > lock(g_mutex); + std::cout << "Added garbage.." << std::endl; + garbage.insert(ref); + } + +private: + static std::mutex g_mutex; + static std::set garbage; +}; + + template class ReBuffer { @@ -100,10 +138,17 @@ public: while (!outputBuffers.empty()) { BufferType *ref = outputBuffers.front(); outputBuffers.pop_front(); - delete ref; + if (ref->getRefCount() <= 0) { + delete ref; + } else { + // Something isn't done with it yet; throw it on the pile.. + std::cout << bufferId << "pushed garbage.." << std::endl; + ReBufferGC::addGarbage(ref); + } } } -private: + + private: std::string bufferId; std::deque outputBuffers; typename std::deque::iterator outputBuffersI; diff --git a/src/demod/DemodulatorMgr.cpp b/src/demod/DemodulatorMgr.cpp index 95970be..64b9d82 100644 --- a/src/demod/DemodulatorMgr.cpp +++ b/src/demod/DemodulatorMgr.cpp @@ -212,6 +212,7 @@ void DemodulatorMgr::setActiveDemodulator(DemodulatorInstance *demod, bool tempo #endif } else { garbageCollect(); + ReBufferGC::garbageCollect(); } if (activeVisualDemodulator) { diff --git a/src/modules/modem/analog/ModemAM.cpp b/src/modules/modem/analog/ModemAM.cpp index 6a5eb3c..af956cd 100644 --- a/src/modules/modem/analog/ModemAM.cpp +++ b/src/modules/modem/analog/ModemAM.cpp @@ -34,5 +34,7 @@ void ModemAM::demodulate(ModemKit *kit, ModemIQData *input, AudioThreadInput *au ampmodem_demodulate(demodAM, input->data[i], &demodOutputData[i]); } + input->decRefCount(); + buildAudioOutput(amkit,audioOut,true); }