diff --git a/src/AppFrame.cpp b/src/AppFrame.cpp index ca5a8c1..6abd434 100644 --- a/src/AppFrame.cpp +++ b/src/AppFrame.cpp @@ -1612,6 +1612,9 @@ void AppFrame::OnIdle(wxIdleEvent& event) { updateDeviceParams(); } + //try to garbage collect the retired demodulators. + wxGetApp().getDemodMgr().garbageCollect(); + DemodulatorInstance *demod = wxGetApp().getDemodMgr().getLastActiveDemodulator(); if (demod && demod->isModemInitialized()) { diff --git a/src/IOThread.cpp b/src/IOThread.cpp index 029fe73..074af53 100644 --- a/src/IOThread.cpp +++ b/src/IOThread.cpp @@ -4,9 +4,6 @@ #include "IOThread.h" #include -std::mutex ReBufferGC::g_mutex; -std::set ReBufferGC::garbage; - #define SPIN_WAIT_SLEEP_MS 5 IOThread::IOThread() { @@ -130,7 +127,7 @@ bool IOThread::isTerminated(int waitMs) { } } - std::cout << "ERROR: thread '" << typeid(*this).name() << "' has not terminated in time ! (> " << waitMs << " ms)" << std::endl; + std::cout << "ERROR: thread '" << typeid(*this).name() << "' has not terminated in time ! (> " << waitMs << " ms)" << std::endl << std::flush; return terminated.load(); } diff --git a/src/IOThread.h b/src/IOThread.h index 46ce897..3449ae2 100644 --- a/src/IOThread.h +++ b/src/IOThread.h @@ -5,13 +5,14 @@ #include #include -#include +#include #include #include #include #include #include - +#include +#include #include "ThreadBlockingQueue.h" #include "Timer.h" @@ -23,163 +24,122 @@ struct map_string_less : public std::binary_function +class ReBufferAge { public: + PtrType ptr; + int age; - //default constructor, initialized with refcont 1, sounds very natural - ReferenceCounter() { - refCount = 1; - } - -// void setIndex(int idx) { -// std::lock_guard < std::recursive_mutex > lock(m_mutex); -// index = idx; -// } - -// int getIndex() { -// std::lock_guard < std::recursive_mutex > lock(m_mutex); -// return index; -// } - - void setRefCount(int rc) { - std::lock_guard < std::recursive_mutex > lock(m_mutex); - refCount = rc; - } - - void decRefCount() { - std::lock_guard < std::recursive_mutex > lock(m_mutex); - refCount--; - } - - int getRefCount() { - std::lock_guard < std::recursive_mutex > lock(m_mutex); - return refCount; - } - - // Access to the own mutex protecting the ReferenceCounter, i.e the monitor of the class - std::recursive_mutex& getMonitor() const { - return m_mutex; - } - -protected: - //this is a basic mutex for all ReferenceCounter derivatives operations INCLUDING the counter itself for consistency ! - mutable std::recursive_mutex m_mutex; - -private: - int refCount; -// int index; + virtual ~ReBufferAge() {}; }; - #define REBUFFER_GC_LIMIT 100 +#define REBUFFER_WARNING_THRESHOLD 150 -class ReBufferGC { -public: - static void garbageCollect() { - std::lock_guard < std::mutex > lock(g_mutex); - - std::deque garbageRemoval; - for (typename std::set::iterator i = garbage.begin(); i != garbage.end(); i++) { - if ((*i)->getRefCount() <= 0) { - garbageRemoval.push_back(*i); - } - else { -// std::cout << "Garbage in queue buffer idx #" << (*i)->getIndex() << ", " << (*i)->getRefCount() << " usage(s)" << std::endl; - std::cout << "Garbage in queue buffer with " << (*i)->getRefCount() << " usage(s)" << std::endl; - } - } - if ( garbageRemoval.size() ) { - std::cout << "Garbage collecting " << garbageRemoval.size() << " ReBuffer(s)" << std::endl; - while (!garbageRemoval.empty()) { - ReferenceCounter *ref = garbageRemoval.back(); - garbageRemoval.pop_back(); - garbage.erase(ref); - delete ref; - } - } - } - - static void addGarbage(ReferenceCounter *ref) { - std::lock_guard < std::mutex > lock(g_mutex); - garbage.insert(ref); - } - -private: - static std::mutex g_mutex; - static std::set garbage; -}; - - -template +template class ReBuffer { + typedef typename std::shared_ptr ReBufferPtr; + public: ReBuffer(std::string bufferId) : bufferId(bufferId) { -// indexCounter.store(0); } - BufferType *getBuffer() { + /// Return a new ReBuffer_ptr usable by the application. + ReBufferPtr getBuffer() { + std::lock_guard < std::mutex > lock(m_mutex); - BufferType* buf = nullptr; - for (outputBuffersI = outputBuffers.begin(); outputBuffersI != outputBuffers.end(); outputBuffersI++) { - if (buf == nullptr && (*outputBuffersI)->getRefCount() <= 0) { - buf = (*outputBuffersI); - buf->setRefCount(1); - } else if ((*outputBuffersI)->getRefCount() <= 0) { - (*outputBuffersI)->decRefCount(); + // iterate the ReBuffer_ptr list: if the std::shared_ptr count == 1, it means + //it is only referenced in outputBuffers itself, so available for re-use. + //else if the std::shared_ptr count <= 1, make it age. + //else the ReBuffer_ptr is in use, don't use it. + + ReBufferPtr buf = nullptr; + + outputBuffersI it = outputBuffers.begin(); + + while (it != outputBuffers.end()) { + + long use = it->ptr.use_count(); + //1. If we encounter a shared_ptr with a use count of 0, this + //is a bug since it is supposed to be at least 1, because it is referenced here. + //in this case, purge it from here and trace. + if (use == 0) { + it = outputBuffers.erase(it); + std::cout << "Warning: in ReBuffer '" << bufferId << "' count '" << outputBuffers.size() << "', found 1 dangling buffer !" << std::endl << std::flush; + } + else if (use == 1) { + if (buf == nullptr) { + it->age = 1; //select this one. + buf = it->ptr; + //std::cout << "**" << std::flush; + it++; + } + else { + //make the other unused buffers age + it->age--; + it++; + } } - } - - if (buf != nullptr) { - if (outputBuffers.back()->getRefCount() < -REBUFFER_GC_LIMIT) { - BufferType *ref = outputBuffers.back(); + else { + it++; + } + } //end while + + //2.1 Garbage collect the oldest (last element) if it aged too much, and return the buffer + if (buf != nullptr) { + + if (outputBuffers.back().age < -REBUFFER_GC_LIMIT) { + //by the nature of the shared_ptr, memory will ne deallocated automatically. outputBuffers.pop_back(); - delete ref; + //std::cout << "--" << std::flush; } -// buf->setIndex(indexCounter++); return buf; } - -#define REBUFFER_WARNING_THRESHOLD 100 - if (outputBuffers.size() > REBUFFER_WARNING_THRESHOLD) { - std::cout << "Warning: ReBuffer '" << bufferId << "' count '" << outputBuffers.size() << "' exceeds threshold of '" << REBUFFER_WARNING_THRESHOLD << "'" << std::endl; - } - //by default created with refcount = 1 - buf = new BufferType(); -// buf->setIndex(indexCounter++); - outputBuffers.push_back(buf); + if (outputBuffers.size() > REBUFFER_WARNING_THRESHOLD) { + std::cout << "Warning: ReBuffer '" << bufferId << "' count '" << outputBuffers.size() << "' exceeds threshold of '" << REBUFFER_WARNING_THRESHOLD << "'" << std::endl << std::flush; + } - return buf; + //3.We need to allocate a new buffer. + ReBufferAge < ReBufferPtr > newBuffer; + + //careful here: newBuffer.ptr is already constructed, so we need to set "in place" its + //ownership to a (new BufferType()). + newBuffer.ptr.reset(new BufferType()); + newBuffer.age = 1; + + outputBuffers.push_back(newBuffer); + + //std::cout << "++" << std::flush; + + return newBuffer.ptr; } + /// Purge the cache. void purge() { std::lock_guard < std::mutex > lock(m_mutex); -// if (bufferId == "DemodulatorThreadBuffers") { -// std::cout << "'" << bufferId << "' purging.. total indexes: " << indexCounter.load() << std::endl; -// } - while (!outputBuffers.empty()) { - BufferType *ref = outputBuffers.front(); - outputBuffers.pop_front(); - if (ref->getRefCount() <= 0) { - delete ref; - } else { - // Something isn't done with it yet; throw it on the pile.. keep this as a bug indicator for now.. - std::cout << "'" << bufferId << "' pushed garbage.." << std::endl; - ReBufferGC::addGarbage(ref); - } - } + + // since outputBuffers are full std::shared_ptr, + //purging if will effectively loose the local reference, + // so the std::shared_ptr will naturally be deallocated + //when their time comes. + outputBuffers.clear(); } - private: +private: + + //name of the buffer cache kind std::string bufferId; - std::deque outputBuffers; - typename std::deque::iterator outputBuffersI; + + //the ReBuffer cache + std::vector< ReBufferAge < ReBufferPtr > > outputBuffers; + + typedef typename std::vector< ReBufferAge < ReBufferPtr > >::iterator outputBuffersI; + + //mutex protecting access to outputBuffers. mutable std::mutex m_mutex; -// std::atomic_int indexCounter; }; diff --git a/src/audio/AudioThread.cpp b/src/audio/AudioThread.cpp index e0d4ad1..a96cf34 100644 --- a/src/audio/AudioThread.cpp +++ b/src/audio/AudioThread.cpp @@ -11,6 +11,8 @@ #include #include +//50 ms +#define HEARTBEAT_CHECK_PERIOD_MICROS (50 * 1000) std::map AudioThread::deviceController; std::map AudioThread::deviceSampleRate; @@ -123,7 +125,7 @@ static int audioCallback(void *outputBuffer, void * /* inputBuffer */, unsigned if (srcmix->currentInput->sampleRate == src->getSampleRate()) { break; } - srcmix->currentInput->decRefCount(); + } srcmix->currentInput = nullptr; } //end while @@ -140,7 +142,7 @@ static int audioCallback(void *outputBuffer, void * /* inputBuffer */, unsigned if (!srcmix->inputQueue->empty()) { srcmix->audioQueuePtr = 0; if (srcmix->currentInput) { - srcmix->currentInput->decRefCount(); + srcmix->currentInput = nullptr; } @@ -160,7 +162,7 @@ static int audioCallback(void *outputBuffer, void * /* inputBuffer */, unsigned if (srcmix->audioQueuePtr >= srcmix->currentInput->data.size()) { srcmix->audioQueuePtr = 0; if (srcmix->currentInput) { - srcmix->currentInput->decRefCount(); + srcmix->currentInput = nullptr; } @@ -187,7 +189,7 @@ static int audioCallback(void *outputBuffer, void * /* inputBuffer */, unsigned if (srcmix->audioQueuePtr >= srcmix->currentInput->data.size()) { srcmix->audioQueuePtr = 0; if (srcmix->currentInput) { - srcmix->currentInput->decRefCount(); + srcmix->currentInput = nullptr; } @@ -429,7 +431,9 @@ void AudioThread::run() { while (!stopping) { AudioThreadCommand command; - cmdQueue.pop(command); + if (!cmdQueue.pop(command, HEARTBEAT_CHECK_PERIOD_MICROS)) { + continue; + } if (command.cmd == AudioThreadCommand::AUDIO_THREAD_CMD_SET_DEVICE) { setupDevice(command.int_value); @@ -444,20 +448,13 @@ void AudioThread::run() { std::lock_guard lock(m_mutex); // Drain any remaining inputs, with a non-blocking pop - AudioThreadInput *ref; - while (inputQueue && inputQueue->try_pop(ref)) { - - if (ref) { - ref->decRefCount(); - } - } //end while + if (inputQueue != nullptr) { + inputQueue->flush(); + } //Nullify currentInput... - if (currentInput) { - currentInput->setRefCount(0); - currentInput = nullptr; - } - + currentInput = nullptr; + //Stop if (deviceController[parameters.deviceId] != this) { deviceController[parameters.deviceId]->removeThread(this); @@ -494,7 +491,6 @@ void AudioThread::setActive(bool state) { std::lock_guard lock(m_mutex); - AudioThreadInput *dummy; if (state && !active && inputQueue) { deviceController[parameters.deviceId]->bindThread(this); } else if (!state && active) { @@ -504,12 +500,7 @@ void AudioThread::setActive(bool state) { // Activity state changing, clear any inputs if(inputQueue) { - while (inputQueue->try_pop(dummy)) { // flush queue, non-blocking pop - - if (dummy) { - dummy->decRefCount(); - } - } + inputQueue->flush(); } active = state; } diff --git a/src/audio/AudioThread.h b/src/audio/AudioThread.h index ce349c0..119059e 100644 --- a/src/audio/AudioThread.h +++ b/src/audio/AudioThread.h @@ -8,12 +8,12 @@ #include #include #include - +#include #include "ThreadBlockingQueue.h" #include "RtAudio.h" #include "DemodDefs.h" -class AudioThreadInput: public ReferenceCounter { +class AudioThreadInput { public: long long frequency; int inputRate; @@ -29,10 +29,12 @@ public: } ~AudioThreadInput() { - std::lock_guard < std::recursive_mutex > lock(m_mutex); + } }; +typedef std::shared_ptr AudioThreadInputPtr; + class AudioThreadCommand { public: enum AudioThreadCommandEnum { @@ -47,12 +49,12 @@ public: int int_value; }; -typedef ThreadBlockingQueue AudioThreadInputQueue; +typedef ThreadBlockingQueue AudioThreadInputQueue; typedef ThreadBlockingQueue AudioThreadCommandQueue; class AudioThread : public IOThread { public: - AudioThreadInput *currentInput; + AudioThreadInputPtr currentInput; AudioThreadInputQueue *inputQueue; std::atomic_uint audioQueuePtr; std::atomic_uint underflowCount; diff --git a/src/demod/DemodDefs.h b/src/demod/DemodDefs.h index d4a922a..ae6c239 100644 --- a/src/demod/DemodDefs.h +++ b/src/demod/DemodDefs.h @@ -9,6 +9,7 @@ #include #include #include +#include #include "IOThread.h" @@ -29,7 +30,7 @@ public: std::string demodType; }; -class DemodulatorThreadIQData: public ReferenceCounter { +class DemodulatorThreadIQData { public: long long frequency; long long sampleRate; @@ -48,7 +49,7 @@ public: return *this; } - ~DemodulatorThreadIQData() { + virtual ~DemodulatorThreadIQData() { } }; @@ -56,7 +57,7 @@ public: class Modem; class ModemKit; -class DemodulatorThreadPostIQData: public ReferenceCounter { +class DemodulatorThreadPostIQData { public: std::vector data; @@ -71,13 +72,13 @@ public: } - ~DemodulatorThreadPostIQData() { - std::lock_guard < std::recursive_mutex > lock(m_mutex); + virtual ~DemodulatorThreadPostIQData() { + } }; -class DemodulatorThreadAudioData: public ReferenceCounter { +class DemodulatorThreadAudioData { public: long long frequency; unsigned int sampleRate; @@ -95,11 +96,13 @@ public: } - ~DemodulatorThreadAudioData() { + virtual ~DemodulatorThreadAudioData() { } }; +typedef std::shared_ptr DemodulatorThreadIQDataPtr; +typedef std::shared_ptr DemodulatorThreadPostIQDataPtr; -typedef ThreadBlockingQueue DemodulatorThreadInputQueue; -typedef ThreadBlockingQueue DemodulatorThreadPostInputQueue; +typedef ThreadBlockingQueue< DemodulatorThreadIQDataPtr > DemodulatorThreadInputQueue; +typedef ThreadBlockingQueue< DemodulatorThreadPostIQDataPtr > DemodulatorThreadPostInputQueue; typedef ThreadBlockingQueue DemodulatorThreadControlCommandQueue; diff --git a/src/demod/DemodulatorInstance.cpp b/src/demod/DemodulatorInstance.cpp index 355daa6..9605707 100644 --- a/src/demod/DemodulatorInstance.cpp +++ b/src/demod/DemodulatorInstance.cpp @@ -78,6 +78,7 @@ DemodulatorInstance::DemodulatorInstance() { } DemodulatorInstance::~DemodulatorInstance() { + std::lock_guard < std::mutex > lockData(m_thread_control_mutex); #if ENABLE_DIGITAL_LAB delete activeOutput; #endif @@ -89,7 +90,7 @@ DemodulatorInstance::~DemodulatorInstance() { delete threadQueueControl; delete pipeAudioData; - wxGetApp().getBookmarkMgr().updateActiveList(); + // wxGetApp().getBookmarkMgr().updateActiveList(); } void DemodulatorInstance::setVisualOutputQueue(DemodulatorThreadOutputQueue *tQueue) { @@ -97,6 +98,9 @@ void DemodulatorInstance::setVisualOutputQueue(DemodulatorThreadOutputQueue *tQu } void DemodulatorInstance::run() { + + std::lock_guard < std::mutex > lockData(m_thread_control_mutex); + if (active) { return; } @@ -128,7 +132,7 @@ void DemodulatorInstance::run() { active = true; - wxGetApp().getBookmarkMgr().updateActiveList(); + // wxGetApp().getBookmarkMgr().updateActiveList(); } void DemodulatorInstance::updateLabel(long long freq) { @@ -166,7 +170,8 @@ void DemodulatorInstance::setLabel(std::string labelStr) { bool DemodulatorInstance::isTerminated() { - // + std::lock_guard < std::mutex > lockData(m_thread_control_mutex); + bool audioTerminated = audioThread->isTerminated(); bool demodTerminated = demodulatorThread->isTerminated(); bool preDemodTerminated = demodulatorPreThread->isTerminated(); diff --git a/src/demod/DemodulatorInstance.h b/src/demod/DemodulatorInstance.h index 749c6ec..a5be9d1 100644 --- a/src/demod/DemodulatorInstance.h +++ b/src/demod/DemodulatorInstance.h @@ -130,7 +130,7 @@ public: void closeOutput(); #endif -protected: +private: DemodulatorThreadInputQueue* pipeIQInputData; DemodulatorThreadPostInputQueue* pipeIQDemodData; AudioThreadInputQueue *pipeAudioData; @@ -138,7 +138,8 @@ protected: DemodulatorThread *demodulatorThread; DemodulatorThreadControlCommandQueue *threadQueueControl; -private: + //protects child thread creation and termination + mutable std::mutex m_thread_control_mutex; std::atomic label; // // User editable buffer, 16 bit string. diff --git a/src/demod/DemodulatorMgr.cpp b/src/demod/DemodulatorMgr.cpp index 4fa9e85..761a6b6 100644 --- a/src/demod/DemodulatorMgr.cpp +++ b/src/demod/DemodulatorMgr.cpp @@ -163,6 +163,7 @@ void DemodulatorMgr::deleteThread(DemodulatorInstance *demod) { demod->terminate(); //Do not cleanup immediatly + std::lock_guard < std::mutex > lock_deleted(deleted_demods_busy); demods_deleted.push_back(demod); } @@ -225,11 +226,7 @@ void DemodulatorMgr::setActiveDemodulator(DemodulatorInstance *demod, bool tempo } #endif wxGetApp().getBookmarkMgr().updateActiveList(); - } else { - std::lock_guard < std::recursive_mutex > lock(demods_busy); - garbageCollect(); - ReBufferGC::garbageCollect(); - } + } if (activeVisualDemodulator.load()) { activeVisualDemodulator.load()->setVisualOutputQueue(nullptr); @@ -281,25 +278,29 @@ DemodulatorInstance *DemodulatorMgr::getLastDemodulatorWith(const std::string& t return nullptr; } -//Private internal method, no need to protect it with demods_busy void DemodulatorMgr::garbageCollect() { - if (demods_deleted.size()) { + + std::lock_guard < std::mutex > lock(deleted_demods_busy); + + std::vector::iterator it = demods_deleted.begin(); + + while (it != demods_deleted.end()) { + + if ((*it)->isTerminated()) { + + DemodulatorInstance *deleted = (*it); - std::vector::iterator i; + std::cout << "Garbage collected demodulator instance '" << deleted->getLabel() << "'... " << std::endl << std::flush; + demods_deleted.erase(it); + delete deleted; - for (i = demods_deleted.begin(); i != demods_deleted.end(); i++) { - if ((*i)->isTerminated()) { - DemodulatorInstance *deleted = (*i); - demods_deleted.erase(i); - - std::cout << "Garbage collected demodulator instance " << deleted->getLabel() << std::endl; - - delete deleted; - return; - } + //only garbage collect 1 demod at a time. + return; } - - } + else { + it++; + } + } //end while } void DemodulatorMgr::updateLastState() { @@ -431,7 +432,6 @@ void DemodulatorMgr::saveInstance(DataNode *node, DemodulatorInstance *inst) { *settingsNode->newChild(msi->first.c_str()) = msi->second; } } - } DemodulatorInstance *DemodulatorMgr::loadInstance(DataNode *node) { diff --git a/src/demod/DemodulatorMgr.h b/src/demod/DemodulatorMgr.h index ca65c22..e6887f1 100644 --- a/src/demod/DemodulatorMgr.h +++ b/src/demod/DemodulatorMgr.h @@ -67,10 +67,11 @@ public: void saveInstance(DataNode *node, DemodulatorInstance *inst); DemodulatorInstance *loadInstance(DataNode *node); + + //to be called periodically to cleanup removed demodulators. + void garbageCollect(); private: - - void garbageCollect(); std::vector demods; std::vector demods_deleted; @@ -91,6 +92,8 @@ private: //protects access to demods lists and such, need to be recursive //because of the usage of public re-entrant methods std::recursive_mutex demods_busy; + + mutable std::mutex deleted_demods_busy; std::map lastModemSettings; std::map outputDevices; diff --git a/src/demod/DemodulatorPreThread.cpp b/src/demod/DemodulatorPreThread.cpp index 574c7fa..f7a6217 100644 --- a/src/demod/DemodulatorPreThread.cpp +++ b/src/demod/DemodulatorPreThread.cpp @@ -12,6 +12,9 @@ #include "CubicSDR.h" #include "DemodulatorInstance.h" +//50 ms +#define HEARTBEAT_CHECK_PERIOD_MICROS (50 * 1000) + DemodulatorPreThread::DemodulatorPreThread(DemodulatorInstance *parent) : IOThread(), iqResampler(NULL), iqResampleRatio(1), cModem(nullptr), cModemKit(nullptr), iqInputQueue(NULL), iqOutputQueue(NULL) { initialized.store(false); @@ -71,9 +74,11 @@ void DemodulatorPreThread::run() { t_Worker = new std::thread(&DemodulatorWorkerThread::threadMain, workerThread); while (!stopping) { - DemodulatorThreadIQData *inp; + DemodulatorThreadIQDataPtr inp; - iqInputQueue->pop(inp); + if (!iqInputQueue->pop(inp, HEARTBEAT_CHECK_PERIOD_MICROS)) { + continue; + } if (frequencyChanged.load()) { currentFrequency.store(newFrequency); @@ -157,7 +162,7 @@ void DemodulatorPreThread::run() { } if (cModem && cModemKit && abs(shiftFrequency) > (int) ((double) (inp->sampleRate / 2) * 1.5)) { - inp->decRefCount(); + continue; } @@ -192,7 +197,7 @@ void DemodulatorPreThread::run() { out_buf = temp_buf; } - DemodulatorThreadPostIQData *resamp = buffers.getBuffer(); + DemodulatorThreadPostIQDataPtr resamp = buffers.getBuffer(); size_t out_size = ceil((double) (bufSize) * iqResampleRatio) + 512; @@ -218,8 +223,6 @@ void DemodulatorPreThread::run() { iqOutputQueue->push(resamp); } - inp->decRefCount(); - DemodulatorWorkerThreadResult result; //process all worker results until while (!stopping && workerResults->try_pop(result)) { @@ -277,11 +280,8 @@ void DemodulatorPreThread::run() { } } //end while stopping - DemodulatorThreadPostIQData *tmp; - while (iqOutputQueue->try_pop(tmp)) { - - tmp->decRefCount(); - } + + iqOutputQueue->flush(); buffers.purge(); } @@ -348,7 +348,7 @@ int DemodulatorPreThread::getAudioSampleRate() { void DemodulatorPreThread::terminate() { IOThread::terminate(); - DemodulatorThreadIQData *inp = new DemodulatorThreadIQData; // push dummy to nudge queue + DemodulatorThreadIQDataPtr inp(new DemodulatorThreadIQData); // push dummy to nudge queue //VSO: blocking push : iqInputQueue->push(inp); diff --git a/src/demod/DemodulatorPreThread.h b/src/demod/DemodulatorPreThread.h index 55bdfcd..88e6dff 100644 --- a/src/demod/DemodulatorPreThread.h +++ b/src/demod/DemodulatorPreThread.h @@ -17,7 +17,7 @@ class DemodulatorPreThread : public IOThread { public: DemodulatorPreThread(DemodulatorInstance *parent); - ~DemodulatorPreThread(); + virtual ~DemodulatorPreThread(); virtual void run(); diff --git a/src/demod/DemodulatorThread.cpp b/src/demod/DemodulatorThread.cpp index a186404..5767c08 100644 --- a/src/demod/DemodulatorThread.cpp +++ b/src/demod/DemodulatorThread.cpp @@ -12,6 +12,9 @@ #define M_PI 3.14159265358979323846 #endif +//50 ms +#define HEARTBEAT_CHECK_PERIOD_MICROS (50 * 1000) + #ifdef __APPLE__ #include #endif @@ -79,15 +82,16 @@ void DemodulatorThread::run() { ModemIQData modemData; while (!stopping) { - DemodulatorThreadPostIQData *inp; - - iqInputQueue->pop(inp); - // std::lock_guard < std::mutex > lock(inp->m_mutex); + DemodulatorThreadPostIQDataPtr inp; + if (!iqInputQueue->pop(inp, HEARTBEAT_CHECK_PERIOD_MICROS)) { + continue; + } + size_t bufSize = inp->data.size(); if (!bufSize) { - inp->decRefCount(); + continue; } @@ -104,7 +108,7 @@ void DemodulatorThread::run() { } if (!cModem || !cModemKit) { - inp->decRefCount(); + continue; } @@ -115,7 +119,7 @@ void DemodulatorThread::run() { modemData.sampleRate = inp->sampleRate; modemData.data.assign(inputData->begin(), inputData->end()); - AudioThreadInput *ati = nullptr; + AudioThreadInputPtr ati = nullptr; ModemAnalog *modemAnalog = (cModem->getType() == "analog")?((ModemAnalog *)cModem):nullptr; ModemDigital *modemDigital = (cModem->getType() == "digital")?((ModemDigital *)cModem):nullptr; @@ -133,7 +137,7 @@ void DemodulatorThread::run() { ati->data.resize(0); } - cModem->demodulate(cModemKit, &modemData, ati); + cModem->demodulate(cModemKit, &modemData, ati.get()); double currentSignalLevel = 0; double sampleTime = double(inp->data.size()) / double(inp->sampleRate); @@ -225,7 +229,6 @@ void DemodulatorThread::run() { } } } else if (ati) { - ati->setRefCount(0); ati = nullptr; } @@ -238,7 +241,7 @@ void DemodulatorThread::run() { } if ((ati || modemDigital) && localAudioVisOutputQueue != nullptr && localAudioVisOutputQueue->empty()) { - AudioThreadInput *ati_vis = new AudioThreadInput; + AudioThreadInputPtr ati_vis(new AudioThreadInput); ati_vis->sampleRate = inp->sampleRate; ati_vis->inputRate = inp->sampleRate; @@ -246,7 +249,7 @@ void DemodulatorThread::run() { size_t num_vis = DEMOD_VIS_SIZE; if (modemDigital) { if (ati) { // TODO: handle digital modems with audio output - ati->setRefCount(0); + ati = nullptr; } ati_vis->data.resize(inputData->size()); @@ -300,7 +303,7 @@ void DemodulatorThread::run() { if (!localAudioVisOutputQueue->try_push(ati_vis)) { //non-blocking push needed for audio vis out - ati_vis->setRefCount(0); + std::cout << "DemodulatorThread::run() cannot push ati_vis into localAudioVisOutputQueue, is full !" << std::endl; std::this_thread::yield(); } @@ -310,12 +313,10 @@ void DemodulatorThread::run() { if (!muted.load() && (!wxGetApp().getSoloMode() || (demodInstance == wxGetApp().getDemodMgr().getLastActiveDemodulator()))) { //non-blocking push needed for audio out if (!audioOutputQueue->try_push(ati)) { - ati->decRefCount(); + std::cout << "DemodulatorThread::run() cannot push ati into audioOutputQueue, is full !" << std::endl; std::this_thread::yield(); } - } else { - ati->setRefCount(0); } } @@ -335,28 +336,12 @@ void DemodulatorThread::run() { break; } } - - - inp->decRefCount(); } // end while !stopping // Purge any unused inputs, with a non-blocking pop - DemodulatorThreadPostIQData *ref; - while (iqInputQueue->try_pop(ref)) { - - if (ref) { // May have other consumers; just decrement - ref->decRefCount(); - } - } - - AudioThreadInput *ref_audio; - while (audioOutputQueue->try_pop(ref_audio)) { - - if (ref_audio) { // Originated here; set RefCount to 0 - ref_audio->setRefCount(0); - } - } + iqInputQueue->flush(); + audioOutputQueue->flush(); outputBuffers.purge(); @@ -365,7 +350,7 @@ void DemodulatorThread::run() { void DemodulatorThread::terminate() { IOThread::terminate(); - DemodulatorThreadPostIQData *inp = new DemodulatorThreadPostIQData; // push dummy to nudge queue + DemodulatorThreadPostIQDataPtr inp(new DemodulatorThreadPostIQData); // push dummy to nudge queue //VSO: blocking push iqInputQueue->push(inp); diff --git a/src/demod/DemodulatorThread.h b/src/demod/DemodulatorThread.h index 1df889c..5152364 100644 --- a/src/demod/DemodulatorThread.h +++ b/src/demod/DemodulatorThread.h @@ -10,7 +10,7 @@ #include "AudioThread.h" #include "Modem.h" -typedef ThreadBlockingQueue DemodulatorThreadOutputQueue; +typedef ThreadBlockingQueue DemodulatorThreadOutputQueue; #define DEMOD_VIS_SIZE 2048 #define DEMOD_SIGNAL_MIN -30 @@ -22,7 +22,7 @@ class DemodulatorThread : public IOThread { public: DemodulatorThread(DemodulatorInstance *parent); - ~DemodulatorThread(); + virtual ~DemodulatorThread(); void onBindOutput(std::string name, ThreadQueueBase *threadQueue); diff --git a/src/demod/DemodulatorWorkerThread.cpp b/src/demod/DemodulatorWorkerThread.cpp index b1304a7..b7e6daa 100644 --- a/src/demod/DemodulatorWorkerThread.cpp +++ b/src/demod/DemodulatorWorkerThread.cpp @@ -6,6 +6,9 @@ #include "CubicSDR.h" #include +//50 ms +#define HEARTBEAT_CHECK_PERIOD_MICROS (50 * 1000) + DemodulatorWorkerThread::DemodulatorWorkerThread() : IOThread(), commandQueue(NULL), resultQueue(NULL), cModem(nullptr), cModemKit(nullptr) { } @@ -31,7 +34,9 @@ void DemodulatorWorkerThread::run() { //we are waiting for the first command to show up (blocking!) //then consuming the commands until done. while (!done) { - commandQueue->pop(command); + if (!commandQueue->pop(command, HEARTBEAT_CHECK_PERIOD_MICROS)) { + continue; + } switch (command.cmd) { case DemodulatorWorkerThreadCommand::DEMOD_WORKER_THREAD_CMD_BUILD_FILTERS: diff --git a/src/demod/DemodulatorWorkerThread.h b/src/demod/DemodulatorWorkerThread.h index cb56176..0ebff7c 100644 --- a/src/demod/DemodulatorWorkerThread.h +++ b/src/demod/DemodulatorWorkerThread.h @@ -76,7 +76,7 @@ class DemodulatorWorkerThread : public IOThread { public: DemodulatorWorkerThread(); - ~DemodulatorWorkerThread(); + virtual ~DemodulatorWorkerThread(); virtual void run(); diff --git a/src/forms/Bookmark/BookmarkView.cpp b/src/forms/Bookmark/BookmarkView.cpp index 883b813..699ef9d 100644 --- a/src/forms/Bookmark/BookmarkView.cpp +++ b/src/forms/Bookmark/BookmarkView.cpp @@ -1411,7 +1411,7 @@ void BookmarkView::onEnterWindow( wxMouseEvent& event ) { } #endif - setStatusText("You can mouse-drag a bookmark entry from one category to the next..etc. TODO: add more Bookmarks descriptions"); + setStatusText("Drag & Drop to create / move bookmarks, Group and arrange bookmarks, quick Search by keywords."); } diff --git a/src/modules/modem/Modem.h b/src/modules/modem/Modem.h index a22166d..355f492 100644 --- a/src/modules/modem/Modem.h +++ b/src/modules/modem/Modem.h @@ -8,6 +8,7 @@ #include "AudioThread.h" #include #include +#include #define MIN_BANDWIDTH 500 @@ -25,7 +26,7 @@ public: int audioSampleRate; }; -class ModemIQData: public ReferenceCounter { +class ModemIQData { public: std::vector data; long long sampleRate; @@ -34,11 +35,13 @@ public: } - ~ModemIQData() { - std::lock_guard < std::recursive_mutex > lock(m_mutex); + virtual ~ModemIQData() { + } }; +typedef std::shared_ptr ModemIQDataPtr; + // Copy of SoapySDR::Range, original comments class ModemRange { diff --git a/src/modules/modem/analog/ModemAM.cpp b/src/modules/modem/analog/ModemAM.cpp index 03da7e8..5c3604f 100644 --- a/src/modules/modem/analog/ModemAM.cpp +++ b/src/modules/modem/analog/ModemAM.cpp @@ -24,13 +24,13 @@ int ModemAM::getDefaultSampleRate() { return 6000; } -void ModemAM::demodulate(ModemKit *kit, ModemIQData *input, AudioThreadInput *audioOut) { +void ModemAM::demodulate(ModemKit *kit, ModemIQData *input, AudioThreadInput* audioOut) { ModemKitAnalog *amkit = (ModemKitAnalog *)kit; initOutputBuffers(amkit,input); if (!bufSize) { - input->decRefCount(); + return; } diff --git a/src/modules/modem/analog/ModemDSB.cpp b/src/modules/modem/analog/ModemDSB.cpp index 37f5b26..4c69c12 100644 --- a/src/modules/modem/analog/ModemDSB.cpp +++ b/src/modules/modem/analog/ModemDSB.cpp @@ -30,7 +30,7 @@ void ModemDSB::demodulate(ModemKit *kit, ModemIQData *input, AudioThreadInput *a initOutputBuffers(amkit, input); if (!bufSize) { - input->decRefCount(); + return; } diff --git a/src/modules/modem/analog/ModemFM.cpp b/src/modules/modem/analog/ModemFM.cpp index da969b8..cf20709 100644 --- a/src/modules/modem/analog/ModemFM.cpp +++ b/src/modules/modem/analog/ModemFM.cpp @@ -29,7 +29,7 @@ void ModemFM::demodulate(ModemKit *kit, ModemIQData *input, AudioThreadInput *au initOutputBuffers(fmkit, input); if (!bufSize) { - input->decRefCount(); + return; } diff --git a/src/modules/modem/analog/ModemIQ.cpp b/src/modules/modem/analog/ModemIQ.cpp index 9fa3ec7..81af37d 100644 --- a/src/modules/modem/analog/ModemIQ.cpp +++ b/src/modules/modem/analog/ModemIQ.cpp @@ -42,7 +42,7 @@ void ModemIQ::demodulate(ModemKit * /* kit */, ModemIQData *input, AudioThreadIn size_t bufSize = input->data.size(); if (!bufSize) { - input->decRefCount(); + return; } diff --git a/src/modules/modem/analog/ModemLSB.cpp b/src/modules/modem/analog/ModemLSB.cpp index 295a9c4..c3fb76d 100644 --- a/src/modules/modem/analog/ModemLSB.cpp +++ b/src/modules/modem/analog/ModemLSB.cpp @@ -46,7 +46,7 @@ void ModemLSB::demodulate(ModemKit *kit, ModemIQData *input, AudioThreadInput *a initOutputBuffers(akit,input); if (!bufSize) { - input->decRefCount(); + return; } diff --git a/src/modules/modem/analog/ModemNBFM.cpp b/src/modules/modem/analog/ModemNBFM.cpp index fd1b255..84e3202 100644 --- a/src/modules/modem/analog/ModemNBFM.cpp +++ b/src/modules/modem/analog/ModemNBFM.cpp @@ -29,7 +29,7 @@ void ModemNBFM::demodulate(ModemKit *kit, ModemIQData *input, AudioThreadInput * initOutputBuffers(fmkit, input); if (!bufSize) { - input->decRefCount(); + return; } diff --git a/src/modules/modem/analog/ModemUSB.cpp b/src/modules/modem/analog/ModemUSB.cpp index 775fadc..8b30761 100644 --- a/src/modules/modem/analog/ModemUSB.cpp +++ b/src/modules/modem/analog/ModemUSB.cpp @@ -46,7 +46,7 @@ void ModemUSB::demodulate(ModemKit *kit, ModemIQData *input, AudioThreadInput *a initOutputBuffers(akit,input); if (!bufSize) { - input->decRefCount(); + return; } diff --git a/src/process/FFTDataDistributor.cpp b/src/process/FFTDataDistributor.cpp index 8636b70..07a6a3d 100644 --- a/src/process/FFTDataDistributor.cpp +++ b/src/process/FFTDataDistributor.cpp @@ -5,6 +5,9 @@ #include #include +//50 ms +#define HEARTBEAT_CHECK_PERIOD_MICROS (50 * 1000) + FFTDataDistributor::FFTDataDistributor() : outputBuffers("FFTDataDistributorBuffers"), fftSize(DEFAULT_FFT_SIZE), linesPerSecond(DEFAULT_WATERFALL_LPS), lineRateAccum(0.0) { } @@ -28,8 +31,11 @@ void FFTDataDistributor::process() { if (!isAnyOutputEmpty()) { return; } - DemodulatorThreadIQData *inp; - input->pop(inp); + DemodulatorThreadIQDataPtr inp; + + if (!input->pop(inp, HEARTBEAT_CHECK_PERIOD_MICROS)) { + continue; + } if (inp) { //Settings have changed, set new values and dump all previous samples stored in inputBuffer: @@ -73,7 +79,7 @@ void FFTDataDistributor::process() { memcpy(&inputBuffer.data[bufferOffset+bufferedItems],&inp->data[0], nbSamplesToAdd *sizeof(liquid_float_complex)); bufferedItems += nbSamplesToAdd; // - inp->decRefCount(); + } else { //empty inp, wait for another. continue; @@ -105,7 +111,8 @@ void FFTDataDistributor::process() { if (lineRateAccum >= 1.0) { //each i represents a FFT computation - DemodulatorThreadIQData *outp = outputBuffers.getBuffer(); + DemodulatorThreadIQDataPtr outp = outputBuffers.getBuffer(); + outp->frequency = inputBuffer.frequency; outp->sampleRate = inputBuffer.sampleRate; outp->data.assign(inputBuffer.data.begin()+bufferOffset+i, diff --git a/src/process/ScopeVisualProcessor.cpp b/src/process/ScopeVisualProcessor.cpp index 5b83b07..a1eefac 100644 --- a/src/process/ScopeVisualProcessor.cpp +++ b/src/process/ScopeVisualProcessor.cpp @@ -47,7 +47,7 @@ void ScopeVisualProcessor::process() { if (!isOutputEmpty()) { return; } - AudioThreadInput *audioInputData; + AudioThreadInputPtr audioInputData; if (input->try_pop(audioInputData)) { @@ -56,11 +56,12 @@ void ScopeVisualProcessor::process() { } size_t i, iMax = audioInputData->data.size(); if (!iMax) { - delete audioInputData; //->decRefCount(); + //discard audioInputData. + audioInputData = nullptr; return; } - ScopeRenderData *renderData = NULL; + ScopeRenderDataPtr renderData = nullptr; if (scopeEnabled) { iMax = audioInputData->data.size(); @@ -150,7 +151,7 @@ void ScopeVisualProcessor::process() { renderData->inputRate = audioInputData->inputRate; renderData->sampleRate = audioInputData->sampleRate; - delete audioInputData; //->decRefCount(); + audioInputData = nullptr; //->decRefCount(); double fft_ceil = 0, fft_floor = 1; @@ -212,8 +213,6 @@ void ScopeVisualProcessor::process() { renderData->spectrum = true; distribute(renderData); - } else { - delete audioInputData; //->decRefCount(); - } + } } //end if try_pop() } diff --git a/src/process/ScopeVisualProcessor.h b/src/process/ScopeVisualProcessor.h index d5ba64c..25185be 100644 --- a/src/process/ScopeVisualProcessor.h +++ b/src/process/ScopeVisualProcessor.h @@ -6,8 +6,9 @@ #include "VisualProcessor.h" #include "AudioThread.h" #include "ScopePanel.h" +#include -class ScopeRenderData: public ReferenceCounter { +class ScopeRenderData { public: std::vector waveform_points; ScopePanel::ScopeMode mode = ScopePanel::SCOPE_MODE_Y; @@ -17,9 +18,15 @@ public: bool spectrum; int fft_size; double fft_floor, fft_ceil; + + virtual ~ScopeRenderData() { + + } }; -typedef ThreadBlockingQueue ScopeRenderDataQueue; +typedef std::shared_ptr ScopeRenderDataPtr; + +typedef ThreadBlockingQueue ScopeRenderDataQueue; class ScopeVisualProcessor : public VisualProcessor { public: diff --git a/src/process/SpectrumVisualProcessor.cpp b/src/process/SpectrumVisualProcessor.cpp index 7b02892..2e69fda 100644 --- a/src/process/SpectrumVisualProcessor.cpp +++ b/src/process/SpectrumVisualProcessor.cpp @@ -4,6 +4,8 @@ #include "SpectrumVisualProcessor.h" #include "CubicSDR.h" +//50 ms +#define HEARTBEAT_CHECK_PERIOD_MICROS (50 * 1000) SpectrumVisualProcessor::SpectrumVisualProcessor() : outputBuffers("SpectrumVisualProcessorBuffers") { lastInputBandwidth = 0; @@ -192,22 +194,18 @@ void SpectrumVisualProcessor::process() { fftSizeChanged.store(false); } - DemodulatorThreadIQData *iqData; + DemodulatorThreadIQDataPtr iqData; - input->pop(iqData); + if (!input->pop(iqData, HEARTBEAT_CHECK_PERIOD_MICROS)) { + return; + } if (!iqData) { return; } - - //Start by locking concurrent access to iqData - std::lock_guard < std::recursive_mutex > lock(iqData->getMonitor()); - //then get the busy_lock std::lock_guard < std::mutex > busy_lock(busy_run); - - bool doPeak = peakHold.load() && (peakReset.load() == 0); @@ -246,7 +244,6 @@ void SpectrumVisualProcessor::process() { if (is_view.load()) { if (!iqData->sampleRate) { - iqData->decRefCount(); return; } @@ -387,7 +384,7 @@ void SpectrumVisualProcessor::process() { } if (execute) { - SpectrumVisualData *output = outputBuffers.getBuffer(); + SpectrumVisualDataPtr output = outputBuffers.getBuffer(); if (output->spectrum_points.size() != fftSize * 2) { output->spectrum_points.resize(fftSize * 2); @@ -597,10 +594,7 @@ void SpectrumVisualProcessor::process() { distribute(output); } - } - - iqData->decRefCount(); - + } lastView = is_view.load(); } diff --git a/src/process/SpectrumVisualProcessor.h b/src/process/SpectrumVisualProcessor.h index bef6490..3dcab2f 100644 --- a/src/process/SpectrumVisualProcessor.h +++ b/src/process/SpectrumVisualProcessor.h @@ -6,20 +6,24 @@ #include "VisualProcessor.h" #include "DemodDefs.h" #include +#include #define SPECTRUM_VZM 2 #define PEAK_RESET_COUNT 30 -class SpectrumVisualData : public ReferenceCounter { +class SpectrumVisualData { public: std::vector spectrum_points; std::vector spectrum_hold_points; double fft_ceiling, fft_floor; long long centerFreq; int bandwidth; + + virtual ~SpectrumVisualData() {}; }; -typedef ThreadBlockingQueue SpectrumVisualDataQueue; +typedef std::shared_ptr SpectrumVisualDataPtr; +typedef ThreadBlockingQueue SpectrumVisualDataQueue; class SpectrumVisualProcessor : public VisualProcessor { public: diff --git a/src/process/VisualProcessor.h b/src/process/VisualProcessor.h index 7a9ee70..df1810a 100644 --- a/src/process/VisualProcessor.h +++ b/src/process/VisualProcessor.h @@ -9,11 +9,16 @@ #include #include -template +template class VisualProcessor { + // - typedef ThreadBlockingQueue VisualInputQueueType; - typedef ThreadBlockingQueue VisualOutputQueueType; + typedef std::shared_ptr InputDataTypePtr; + typedef std::shared_ptr OutputDataTypePtr; + + typedef ThreadBlockingQueue VisualInputQueueType; + typedef ThreadBlockingQueue VisualOutputQueueType; + typedef typename std::vector< VisualOutputQueueType *>::iterator outputs_i; public: virtual ~VisualProcessor() { @@ -96,22 +101,19 @@ protected: //available outputs, previously set by attachOutput(). //* \param[in] timeout The number of microseconds to wait to push an item in each one of the outputs, 0(default) means indefinite wait. //* \param[in] errorMessage an error message written on std::cout in case pf push timeout. - void distribute(OutputDataType *item, std::uint64_t timeout = BLOCKING_INFINITE_TIMEOUT, const char* errorMessage = "") { + void distribute(OutputDataTypePtr item, std::uint64_t timeout = BLOCKING_INFINITE_TIMEOUT, const char* errorMessage = nullptr) { std::lock_guard < std::recursive_mutex > busy_lock(busy_update); //We will try to distribute 'output' among all 'outputs', - //so 'output' will a-priori be shared among all 'outputs' so set its ref count to this - //amount. - item->setRefCount((int)outputs.size()); + //so 'output' will a-priori be shared among all 'outputs'. + for (outputs_i it = outputs.begin(); it != outputs.end(); it++) { - //if 'output' failed to be given to an outputs_i, dec its ref count accordingly. - //blocking push, with a timeout + //'output' can fail to be given to an outputs_i, + //using a blocking push, with a timeout if (!(*it)->push(item, timeout, errorMessage)) { - item->decRefCount(); + //TODO : trace ? } } - // Now 'item' refcount matches the times 'item' has been successfully distributed, - //i.e shared among the outputs. } //the incoming data queue @@ -127,7 +129,7 @@ protected: //Specialization much like VisualDataReDistributor, except //the input (pointer) is directly re-dispatched //to outputs, so that all output indeed SHARE the same instance. -template +template class VisualDataDistributor : public VisualProcessor { protected: virtual void process() { @@ -136,18 +138,14 @@ protected: if (!VisualProcessor::isAnyOutputEmpty()) { if (inp) { - inp->decRefCount(); + //nothing } return; } if (inp) { - int previousRefCount = inp->getRefCount(); VisualProcessor::distribute(inp); - //inp is now shared through the distribute(), which overwrite the previous ref count, - //so increment it properly. - int distributeRefCount = inp->getRefCount(); - inp->setRefCount(previousRefCount + distributeRefCount); + //inp is now shared through the distribute() call. } } } @@ -155,7 +153,7 @@ protected: //specialization class which process() take an input item and re-dispatch //A COPY to every outputs, without further processing. This is a 1-to-n dispatcher. -template +template class VisualDataReDistributor : public VisualProcessor { protected: virtual void process() { @@ -164,15 +162,17 @@ protected: if (!VisualProcessor::isAnyOutputEmpty()) { if (inp) { - inp->decRefCount(); + //nothing } return; } if (inp) { - OutputDataType *outp = buffers.getBuffer(); + OutputDataTypePtr outp = buffers.getBuffer(); + + //'deep copy' of the contents (*outp) = (*inp); - inp->decRefCount(); + VisualProcessor::distribute(outp); } } diff --git a/src/sdr/SDRPostThread.cpp b/src/sdr/SDRPostThread.cpp index afefd60..fa9b38a 100644 --- a/src/sdr/SDRPostThread.cpp +++ b/src/sdr/SDRPostThread.cpp @@ -8,6 +8,9 @@ #include #include +//50 ms +#define HEARTBEAT_CHECK_PERIOD_MICROS (50 * 1000) + SDRPostThread::SDRPostThread() : IOThread(), buffers("SDRPostThreadBuffers"), visualDataBuffers("SDRPostThreadVisualDataBuffers"), frequency(0) { iqDataInQueue = NULL; iqDataOutQueue = NULL; @@ -183,23 +186,25 @@ void SDRPostThread::run() { iqActiveDemodVisualQueue = static_cast(getOutputQueue("IQActiveDemodVisualDataOutput")); while (!stopping) { - SDRThreadIQData *data_in; + SDRThreadIQDataPtr data_in; - iqDataInQueue->pop(data_in); + if (!iqDataInQueue->pop(data_in, HEARTBEAT_CHECK_PERIOD_MICROS)) { + continue; + } // std::lock_guard < std::mutex > lock(data_in->m_mutex); std::lock_guard < std::mutex > lock(busy_demod); if (data_in && data_in->data.size()) { if(data_in->numChannels > 1) { - runPFBCH(data_in); + runPFBCH(data_in.get()); } else { - runSingleCH(data_in); + runSingleCH(data_in.get()); } } if (data_in) { - data_in->decRefCount(); + //nothing } bool doUpdate = false; @@ -217,9 +222,8 @@ void SDRPostThread::run() { } //end while //Be safe, remove as many elements as possible - DemodulatorThreadIQData *visualDataDummy; - while (iqVisualQueue && iqVisualQueue->try_pop(visualDataDummy)) { - visualDataDummy->decRefCount(); + if (iqVisualQueue) { + iqVisualQueue->flush(); } // buffers.purge(); @@ -230,7 +234,7 @@ void SDRPostThread::run() { void SDRPostThread::terminate() { IOThread::terminate(); - SDRThreadIQData *dummy = new SDRThreadIQData; + SDRThreadIQDataPtr dummy(new SDRThreadIQData); //VSO: blocking push iqDataInQueue->push(dummy); } @@ -278,8 +282,8 @@ void SDRPostThread::runSingleCH(SDRThreadIQData *data_in) { } if (refCount) { - DemodulatorThreadIQData *demodDataOut = buffers.getBuffer(); - demodDataOut->setRefCount(refCount); + DemodulatorThreadIQDataPtr demodDataOut = buffers.getBuffer(); + demodDataOut->frequency = frequency; demodDataOut->sampleRate = sampleRate; @@ -333,7 +337,7 @@ void SDRPostThread::runPFBCH(SDRThreadIQData *data_in) { } if (iqDataOutQueue != NULL && !iqDataOutQueue->full()) { - DemodulatorThreadIQData *iqDataOut = visualDataBuffers.getBuffer(); + DemodulatorThreadIQDataPtr iqDataOut = visualDataBuffers.getBuffer(); bool doVis = false; @@ -341,8 +345,6 @@ void SDRPostThread::runPFBCH(SDRThreadIQData *data_in) { doVis = true; } - iqDataOut->setRefCount(1 + (doVis?1:0)); - iqDataOut->frequency = data_in->frequency; iqDataOut->sampleRate = data_in->sampleRate; iqDataOut->data.assign(data_in->data.begin(), data_in->data.begin() + dataSize); @@ -407,8 +409,7 @@ void SDRPostThread::runPFBCH(SDRThreadIQData *data_in) { continue; } - DemodulatorThreadIQData *demodDataOut = buffers.getBuffer(); - demodDataOut->setRefCount(demodChannelActive[i] + doDemodVis); + DemodulatorThreadIQDataPtr demodDataOut = buffers.getBuffer(); demodDataOut->frequency = chanCenters[i]; demodDataOut->sampleRate = chanBw; diff --git a/src/sdr/SoapySDRThread.cpp b/src/sdr/SoapySDRThread.cpp index adb4eb9..a24fa98 100644 --- a/src/sdr/SoapySDRThread.cpp +++ b/src/sdr/SoapySDRThread.cpp @@ -207,17 +207,17 @@ void SDRThread::readStream(SDRThreadIQDataQueue* iqDataOutQueue) { //TODO: Add in doc the need to reduce SoapySDR device buffer length (if available) to restore higher fps. //0. Retreive a new batch - SDRThreadIQData *dataOut = buffers.getBuffer(); + SDRThreadIQDataPtr dataOut = buffers.getBuffer(); //resize to the target size immedialetly, to minimize later reallocs: - assureBufferMinSize(dataOut, nElems); + assureBufferMinSize(dataOut.get(), nElems); //1.If overflow occured on the previous readStream(), transfer it in dataOut directly. if (numOverflow > 0) { int n_overflow = std::min(numOverflow, nElems); //safety - assureBufferMinSize(dataOut, n_overflow); + assureBufferMinSize(dataOut.get(), n_overflow); ::memcpy(&dataOut->data[0], &overflowBuffer.data[0], n_overflow * sizeof(liquid_float_complex)); n_read = n_overflow; @@ -266,7 +266,7 @@ void SDRThread::readStream(SDRThreadIQDataQueue* iqDataOutQueue) { float *pp = (float *)buffs[0]; //safety - assureBufferMinSize(dataOut, n_read + n_requested); + assureBufferMinSize(dataOut.get(), n_read + n_requested); if (iq_swap.load()) { for (int i = 0; i < n_requested; i++) { @@ -315,7 +315,7 @@ void SDRThread::readStream(SDRThreadIQDataQueue* iqDataOutQueue) { float *pp = (float *)buffs[0]; //safety - assureBufferMinSize(dataOut, n_read + n_stream_read); + assureBufferMinSize(dataOut.get(), n_read + n_stream_read); if (iq_swap.load()) { for (int i = 0; i < n_stream_read; i++) { @@ -349,8 +349,7 @@ void SDRThread::readStream(SDRThreadIQDataQueue* iqDataOutQueue) { if (!iqDataOutQueue->try_push(dataOut)) { //The rest of the system saturates, - //finally the push didn't suceeded, recycle dataOut immediatly. - dataOut->setRefCount(0); + //finally the push didn't suceeded. std::cout << "SDRThread::readStream(): 3.2 iqDataOutQueue output queue is full, discard processing of the batch..." << std::endl; @@ -359,7 +358,6 @@ void SDRThread::readStream(SDRThreadIQDataQueue* iqDataOutQueue) { } } else { - dataOut->setRefCount(0); std::cout << "SDRThread::readStream(): 3.1 iqDataOutQueue output queue is full, discard processing of the batch..." << std::endl; //saturation, let a chance to the other threads to consume the existing samples std::this_thread::yield(); diff --git a/src/sdr/SoapySDRThread.h b/src/sdr/SoapySDRThread.h index e38077f..93fb381 100644 --- a/src/sdr/SoapySDRThread.h +++ b/src/sdr/SoapySDRThread.h @@ -4,7 +4,7 @@ #pragma once #include - +#include #include "ThreadBlockingQueue.h" #include "DemodulatorMgr.h" #include "SDRDeviceInfo.h" @@ -17,7 +17,7 @@ #include -class SDRThreadIQData: public ReferenceCounter { +class SDRThreadIQData { public: long long frequency; long long sampleRate; @@ -35,12 +35,12 @@ public: } - ~SDRThreadIQData() { + virtual ~SDRThreadIQData() { } }; - -typedef ThreadBlockingQueue SDRThreadIQDataQueue; +typedef std::shared_ptr SDRThreadIQDataPtr; +typedef ThreadBlockingQueue SDRThreadIQDataQueue; class SDRThread : public IOThread { private: diff --git a/src/util/ThreadBlockingQueue.h b/src/util/ThreadBlockingQueue.h index d4e5588..cf8134c 100644 --- a/src/util/ThreadBlockingQueue.h +++ b/src/util/ThreadBlockingQueue.h @@ -3,7 +3,7 @@ #pragma once -#include +#include #include #include #include @@ -29,24 +29,22 @@ class ThreadQueueBase { template class ThreadBlockingQueue : public ThreadQueueBase { - typedef typename std::vector::value_type value_type; - typedef typename std::vector::size_type size_type; + typedef typename std::deque::value_type value_type; + typedef typename std::deque::size_type size_type; public: /*! Create safe blocking queue. */ ThreadBlockingQueue() { - //at least 1 (== Exchanger) - m_circular_buffer.resize(MIN_ITEM_NB); + //at least 1 (== Java SynchronizedQueue) + m_max_num_items = MIN_ITEM_NB; }; //Copy constructor ThreadBlockingQueue(const ThreadBlockingQueue& sq) { std::lock_guard < std::mutex > lock(sq.m_mutex); - m_circular_buffer = sq.m_circular_buffer; - m_head = sq.m_head; - m_tail = sq.m_tail; - m_size = sq.m_size; + m_queue = sq.m_queue; + m_max_num_items = sq.m_max_num_items; } /*! Destroy safe queue. */ @@ -62,11 +60,10 @@ public: void set_max_num_items(unsigned int max_num_items) { std::lock_guard < std::mutex > lock(m_mutex); - if (max_num_items > (unsigned int)m_circular_buffer.size()) { + if (max_num_items > m_max_num_items) { //Only raise the existing max size, never reduce it //for simplification sake at runtime. - m_circular_buffer.resize(max_num_items); - //m_head and m_tail stays valid for the new size. + m_max_num_items = max_num_items; m_cond_not_full.notify_all(); } } @@ -77,35 +74,34 @@ public: * \param[in] item An item. * \param[in] timeout a max waiting timeout in microseconds for an item to be pushed. * by default, = 0 means indefinite wait. - * \param[in] errorMessage an error message written on std::cout in case of the timeout wait + * \param[in] errorMessage if != nullptr (is nullptr by default) an error message written on std::cout in case of the timeout wait * \return true if an item was pushed into the queue, else a timeout has occured. */ - bool push(const value_type& item, std::uint64_t timeout = BLOCKING_INFINITE_TIMEOUT,const char* errorMessage = "") { + bool push(const value_type& item, std::uint64_t timeout = BLOCKING_INFINITE_TIMEOUT,const char* errorMessage = nullptr) { std::unique_lock < std::mutex > lock(m_mutex); if (timeout == BLOCKING_INFINITE_TIMEOUT) { m_cond_not_full.wait(lock, [this]() // Lambda funct { - return m_size < m_circular_buffer.size(); + return m_queue.size() < m_max_num_items; }); - } else if (timeout <= NON_BLOCKING_TIMEOUT && m_size >= m_circular_buffer.size()) { + } else if (timeout <= NON_BLOCKING_TIMEOUT && m_queue.size() >= m_max_num_items) { // if the value is below a threshold, consider it is a try_push() return false; } else if (false == m_cond_not_full.wait_for(lock, std::chrono::microseconds(timeout), - [this]() { return m_size < m_circular_buffer.size(); })) { - std::thread::id currentThreadId = std::this_thread::get_id(); - std::cout << "WARNING: Thread 0x" << std::hex << currentThreadId << std::dec << - " (" << currentThreadId << ") executing {" << typeid(*this).name() << "}.push() has failed with timeout > " << - (timeout * 0.001) << " ms, message: " << errorMessage << std::endl; - return false; + [this]() { return m_queue.size() < m_max_num_items; })) { + + if (errorMessage != nullptr) { + std::thread::id currentThreadId = std::this_thread::get_id(); + std::cout << "WARNING: Thread 0x" << std::hex << currentThreadId << std::dec << + " (" << currentThreadId << ") executing {" << typeid(*this).name() << "}.push() has failed with timeout > " << + (timeout * 0.001) << " ms, message: " << errorMessage << std::endl << std::flush; + } + return false; } - //m_tail is already the next valid place an item can be put - m_circular_buffer[m_tail] = item; - m_tail = nextIndex(m_tail, (int)m_circular_buffer.size()); - m_size++; - + m_queue.push_back(item); m_cond_not_empty.notify_all(); return true; } @@ -118,14 +114,11 @@ public: bool try_push(const value_type& item) { std::lock_guard < std::mutex > lock(m_mutex); - if (m_size >= m_circular_buffer.size()) { + if (m_queue.size() >= m_max_num_items) { return false; } - //m_tail is already the next valid place an item can be put - m_circular_buffer[m_tail] = item; - m_tail = nextIndex(m_tail, (int)m_circular_buffer.size()); - m_size++; + m_queue.push_back(item); m_cond_not_empty.notify_all(); return true; } @@ -133,33 +126,35 @@ public: /** * Pops item from the queue. If the queue is empty, blocks for timeout microseconds, or until item becomes available. * \param[in] timeout The number of microseconds to wait. O (default) means indefinite wait. - * \param[in] errorMessage an error message written on std::cout in case of the timeout wait + * \param[in] errorMessage if != nullptr (is nullptr by default) an error message written on std::cout in case of the timeout wait * \return true if get an item from the queue, false if no item is received before the timeout. */ - bool pop(value_type& item, std::uint64_t timeout = BLOCKING_INFINITE_TIMEOUT, const char* errorMessage = "") { + bool pop(value_type& item, std::uint64_t timeout = BLOCKING_INFINITE_TIMEOUT, const char* errorMessage = nullptr) { std::unique_lock < std::mutex > lock(m_mutex); if (timeout == BLOCKING_INFINITE_TIMEOUT) { m_cond_not_empty.wait(lock, [this]() // Lambda funct { - return m_size > 0; + return !m_queue.empty(); }); - } else if (timeout <= NON_BLOCKING_TIMEOUT && m_size == 0) { + } else if (timeout <= NON_BLOCKING_TIMEOUT && m_queue.empty()) { // if the value is below a threshold, consider it is try_pop() return false; } else if (false == m_cond_not_empty.wait_for(lock, std::chrono::microseconds(timeout), - [this]() { return m_size > 0; })) { - std::thread::id currentThreadId = std::this_thread::get_id(); - std::cout << "WARNING: Thread 0x" << std::hex << currentThreadId << std::dec << - " (" << currentThreadId << ") executing {" << typeid(*this).name() << "}.pop() has failed with timeout > " << - (timeout * 0.001) << " ms, message: " << errorMessage << std::endl; + [this]() { return !m_queue.empty(); })) { + + if (errorMessage != nullptr) { + std::thread::id currentThreadId = std::this_thread::get_id(); + std::cout << "WARNING: Thread 0x" << std::hex << currentThreadId << std::dec << + " (" << currentThreadId << ") executing {" << typeid(*this).name() << "}.pop() has failed with timeout > " << + (timeout * 0.001) << " ms, message: " << errorMessage << std::endl << std::flush; + } return false; } - item = m_circular_buffer[m_head]; - m_head = nextIndex(m_head, (int)m_circular_buffer.size()); - m_size--; + item = m_queue.front(); + m_queue.pop_front(); m_cond_not_full.notify_all(); return true; } @@ -172,13 +167,12 @@ public: bool try_pop(value_type& item) { std::lock_guard < std::mutex > lock(m_mutex); - if (m_size == 0) { + if (m_queue.empty()) { return false; } - item = m_circular_buffer[m_head]; - m_head = nextIndex(m_head, (int)m_circular_buffer.size()); - m_size--; + item = m_queue.front(); + m_queue.pop_front(); m_cond_not_full.notify_all(); return true; } @@ -190,7 +184,7 @@ public: */ size_type size() const { std::lock_guard < std::mutex > lock(m_mutex); - return m_size; + return m_queue.size(); } /** @@ -199,7 +193,7 @@ public: */ bool empty() const { std::lock_guard < std::mutex > lock(m_mutex); - return m_size == 0; + return m_queue.empty(); } /** @@ -208,7 +202,7 @@ public: */ bool full() const { std::lock_guard < std::mutex > lock(m_mutex); - return (m_size >= m_circular_buffer.size()); + return (m_queue.size() >= m_max_num_items); } /** @@ -216,9 +210,7 @@ public: */ void flush() { std::lock_guard < std::mutex > lock(m_mutex); - m_head = 0; - m_tail = 0; - m_size = 0; + m_queue.clear(); m_cond_not_full.notify_all(); } @@ -230,25 +222,22 @@ public: if (this != &sq) { std::lock_guard < std::mutex > lock1(m_mutex); std::lock_guard < std::mutex > lock2(sq.m_mutex); - m_circular_buffer.swap(sq.m_circular_buffer); + m_queue.swap(sq.m_queue); + std::swap(m_max_num_items, sq.m_max_num_items); - std::swap(m_head, sq.m_head); - std::swap(m_tail, sq.m_tail); - std::swap(m_size, sq.m_size); - - if (m_size > 0) { + if (!m_queue.empty()) { m_cond_not_empty.notify_all(); } - if (sq.m_size > 0) { + if (!sq.m_queue.empty()) { sq.m_cond_not_empty.notify_all(); } - if (m_size < m_circular_buffer.size()) { + if (!m_queue.full()) { m_cond_not_full.notify_all(); } - if (sq.m_size < sq.m_circular_buffer.size()) { + if (!sq.m_queue.full()) { sq.m_cond_not_full.notify_all(); } } @@ -260,17 +249,14 @@ public: std::lock_guard < std::mutex > lock1(m_mutex); std::lock_guard < std::mutex > lock2(sq.m_mutex); - m_circular_buffer = sq.m_circular_buffer; + m_queue = sq.m_queue; + m_max_num_items = sq.m_max_num_items; - m_head = sq.m_head; - m_tail = sq.m_tail; - m_size = sq.m_size; - - if (m_size > 0) { + if (!m_queue.empty()) { m_cond_not_empty.notify_all(); } - if (m_size < m_circular_buffer.size()) { + if (!m_queue.full()) { m_cond_not_full.notify_all(); } } @@ -278,27 +264,13 @@ public: } private: - /// use a circular buffer structure to prevent allocations / reallocations (fixed array + modulo) - std::vector m_circular_buffer; - /** - * The 'head' index of the element at the head of the deque, 'tail' - * the next (valid !) index at which an element can be pushed. - * m_head == m_tail means empty. - */ - int m_head = 0, m_tail = 0; - - //hold the current number of elements. - size_type m_size = 0; - - // - inline int nextIndex(int index, int mod) const { - return (index + 1 == mod) ? 0 : index + 1; - } + std::deque m_queue; mutable std::mutex m_mutex; std::condition_variable m_cond_not_empty; std::condition_variable m_cond_not_full; + size_t m_max_num_items = MIN_ITEM_NB; }; /*! Swaps the contents of two ThreadBlockingQueue objects. (external operator) */ diff --git a/src/visual/ScopeCanvas.cpp b/src/visual/ScopeCanvas.cpp index 5371513..fa25b8a 100644 --- a/src/visual/ScopeCanvas.cpp +++ b/src/visual/ScopeCanvas.cpp @@ -104,7 +104,7 @@ void ScopeCanvas::OnPaint(wxPaintEvent& WXUNUSED(event)) { wxPaintDC dc(this); const wxSize ClientSize = GetClientSize(); - ScopeRenderData *avData; + ScopeRenderDataPtr avData; while (inputData.try_pop(avData)) { @@ -113,7 +113,7 @@ void ScopeCanvas::OnPaint(wxPaintEvent& WXUNUSED(event)) { if (avData->waveform_points.size()) { scopePanel.setPoints(avData->waveform_points); } - avData->decRefCount(); + } else { if (avData->waveform_points.size()) { spectrumPanel.setPoints(avData->waveform_points); @@ -124,8 +124,7 @@ void ScopeCanvas::OnPaint(wxPaintEvent& WXUNUSED(event)) { spectrumPanel.setFFTSize(avData->fft_size); spectrumPanel.setShowDb(showDb); } - - avData->decRefCount(); + } } diff --git a/src/visual/SpectrumCanvas.cpp b/src/visual/SpectrumCanvas.cpp index 742bb2c..abfc6f5 100644 --- a/src/visual/SpectrumCanvas.cpp +++ b/src/visual/SpectrumCanvas.cpp @@ -54,7 +54,7 @@ void SpectrumCanvas::OnPaint(wxPaintEvent& WXUNUSED(event)) { wxPaintDC dc(this); const wxSize ClientSize = GetClientSize(); - SpectrumVisualData *vData; + SpectrumVisualDataPtr vData; if (visualDataQueue.try_pop(vData)) { if (vData) { @@ -62,7 +62,6 @@ void SpectrumCanvas::OnPaint(wxPaintEvent& WXUNUSED(event)) { spectrumPanel.setPeakPoints(vData->spectrum_hold_points); spectrumPanel.setFloorValue(vData->fft_floor); spectrumPanel.setCeilValue(vData->fft_ceiling); - vData->decRefCount(); } } diff --git a/src/visual/WaterfallCanvas.cpp b/src/visual/WaterfallCanvas.cpp index b75ca92..9d26d56 100644 --- a/src/visual/WaterfallCanvas.cpp +++ b/src/visual/WaterfallCanvas.cpp @@ -97,7 +97,7 @@ void WaterfallCanvas::processInputQueue() { if (linesPerSecond) { if (lpsIndex >= targetVis) { while (lpsIndex >= targetVis) { - SpectrumVisualData *vData; + SpectrumVisualDataPtr vData; if (visualDataQueue.try_pop(vData)) { @@ -106,7 +106,7 @@ void WaterfallCanvas::processInputQueue() { waterfallPanel.setPoints(vData->spectrum_points); } waterfallPanel.step(); - vData->decRefCount(); + updated = true; } lpsIndex-=targetVis; @@ -915,13 +915,7 @@ void WaterfallCanvas::setLinesPerSecond(int lps) { linesPerSecond = lps; //empty all - SpectrumVisualData *vData; - while (visualDataQueue.try_pop(vData)) { - - if (vData) { - vData->decRefCount(); - } - } + visualDataQueue.flush(); } void WaterfallCanvas::setMinBandwidth(int min) {