Merge pull request #367 from cjcliffe/thread_cleanup_fix

Threading cleanup; fix some old mixer integration cruft and resolve a lot of random crashes..
This commit is contained in:
Charles J. Cliffe 2016-06-08 23:54:27 -04:00
commit 348b5404c3
7 changed files with 116 additions and 27 deletions

View File

@ -1,5 +1,8 @@
#include "IOThread.h" #include "IOThread.h"
std::mutex ReBufferGC::g_mutex;
std::set<ReferenceCounter *> ReBufferGC::garbage;
IOThread::IOThread() { IOThread::IOThread() {
terminated.store(false); terminated.store(false);
} }

View File

@ -4,6 +4,7 @@
#include <atomic> #include <atomic>
#include <deque> #include <deque>
#include <map> #include <map>
#include <set>
#include <string> #include <string>
#include <iostream> #include <iostream>
@ -23,6 +24,16 @@ class ReferenceCounter {
public: public:
// void setIndex(int idx) {
// std::lock_guard < std::recursive_mutex > lock(m_mutex);
// index = idx;
// }
// int getIndex() {
// std::lock_guard < std::recursive_mutex > lock(m_mutex);
// return index;
// }
void setRefCount(int rc) { void setRefCount(int rc) {
std::lock_guard < std::recursive_mutex > lock(m_mutex); std::lock_guard < std::recursive_mutex > lock(m_mutex);
refCount = rc; refCount = rc;
@ -49,17 +60,55 @@ protected:
private: private:
int refCount; int refCount;
// int index;
}; };
#define REBUFFER_GC_LIMIT 100 #define REBUFFER_GC_LIMIT 100
class ReBufferGC {
public:
static void garbageCollect() {
std::lock_guard < std::mutex > lock(g_mutex);
std::deque<ReferenceCounter *> garbageRemoval;
for (typename std::set<ReferenceCounter *>::iterator i = garbage.begin(); i != garbage.end(); i++) {
if ((*i)->getRefCount() <= 0) {
garbageRemoval.push_back(*i);
}
else {
// std::cout << "Garbage in queue buffer idx #" << (*i)->getIndex() << ", " << (*i)->getRefCount() << " usage(s)" << std::endl;
std::cout << "Garbage in queue buffer with " << (*i)->getRefCount() << " usage(s)" << std::endl;
}
}
if ( garbageRemoval.size() ) {
std::cout << "Garbage collecting " << garbageRemoval.size() << " ReBuffer(s)" << std::endl;
while (!garbageRemoval.empty()) {
ReferenceCounter *ref = garbageRemoval.back();
garbageRemoval.pop_back();
garbage.erase(ref);
delete ref;
}
}
}
static void addGarbage(ReferenceCounter *ref) {
std::lock_guard < std::mutex > lock(g_mutex);
garbage.insert(ref);
}
private:
static std::mutex g_mutex;
static std::set<ReferenceCounter *> garbage;
};
template<class BufferType = ReferenceCounter> template<class BufferType = ReferenceCounter>
class ReBuffer { class ReBuffer {
public: public:
ReBuffer(std::string bufferId) : bufferId(bufferId) { ReBuffer(std::string bufferId) : bufferId(bufferId) {
// indexCounter.store(0);
} }
BufferType *getBuffer() { BufferType *getBuffer() {
@ -69,7 +118,7 @@ public:
for (outputBuffersI = outputBuffers.begin(); outputBuffersI != outputBuffers.end(); outputBuffersI++) { for (outputBuffersI = outputBuffers.begin(); outputBuffersI != outputBuffers.end(); outputBuffersI++) {
if (buf == nullptr && (*outputBuffersI)->getRefCount() <= 0) { if (buf == nullptr && (*outputBuffersI)->getRefCount() <= 0) {
buf = (*outputBuffersI); buf = (*outputBuffersI);
buf->setRefCount(0); buf->setRefCount(1);
} else if ((*outputBuffersI)->getRefCount() <= 0) { } else if ((*outputBuffersI)->getRefCount() <= 0) {
(*outputBuffersI)->decRefCount(); (*outputBuffersI)->decRefCount();
} }
@ -81,6 +130,7 @@ public:
outputBuffers.pop_back(); outputBuffers.pop_back();
delete ref; delete ref;
} }
// buf->setIndex(indexCounter++);
return buf; return buf;
} }
@ -90,6 +140,8 @@ public:
} }
buf = new BufferType(); buf = new BufferType();
buf->setRefCount(1);
// buf->setIndex(indexCounter++);
outputBuffers.push_back(buf); outputBuffers.push_back(buf);
return buf; return buf;
@ -97,17 +149,28 @@ public:
void purge() { void purge() {
std::lock_guard < std::mutex > lock(m_mutex); std::lock_guard < std::mutex > lock(m_mutex);
// if (bufferId == "DemodulatorThreadBuffers") {
// std::cout << "'" << bufferId << "' purging.. total indexes: " << indexCounter.load() << std::endl;
// }
while (!outputBuffers.empty()) { while (!outputBuffers.empty()) {
BufferType *ref = outputBuffers.front(); BufferType *ref = outputBuffers.front();
outputBuffers.pop_front(); outputBuffers.pop_front();
delete ref; if (ref->getRefCount() <= 0) {
delete ref;
} else {
// Something isn't done with it yet; throw it on the pile.. keep this as a bug indicator for now..
std::cout << "'" << bufferId << "' pushed garbage.." << std::endl;
ReBufferGC::addGarbage(ref);
}
} }
} }
private:
private:
std::string bufferId; std::string bufferId;
std::deque<BufferType*> outputBuffers; std::deque<BufferType*> outputBuffers;
typename std::deque<BufferType*>::iterator outputBuffersI; typename std::deque<BufferType*>::iterator outputBuffersI;
mutable std::mutex m_mutex; mutable std::mutex m_mutex;
// std::atomic_int indexCounter;
}; };

View File

@ -89,8 +89,6 @@ static int audioCallback(void *outputBuffer, void * /* inputBuffer */, unsigned
continue; continue;
} }
// std::lock_guard < std::mutex > lock(srcmix->currentInput->m_mutex);
if (srcmix->currentInput->sampleRate != src->getSampleRate()) { if (srcmix->currentInput->sampleRate != src->getSampleRate()) {
while (srcmix->inputQueue->size()) { while (srcmix->inputQueue->size()) {
srcmix->inputQueue->pop(srcmix->currentInput); srcmix->inputQueue->pop(srcmix->currentInput);
@ -393,6 +391,20 @@ void AudioThread::run() {
setSampleRate(command.int_value); setSampleRate(command.int_value);
} }
} }
// Drain any remaining inputs
if (inputQueue) while (!inputQueue->empty()) {
AudioThreadInput *ref;
inputQueue->pop(ref);
if (ref) {
ref->decRefCount();
}
}
if (currentInput) {
currentInput->setRefCount(0);
currentInput = nullptr;
}
if (deviceController[parameters.deviceId] != this) { if (deviceController[parameters.deviceId] != this) {
deviceController[parameters.deviceId]->removeThread(this); deviceController[parameters.deviceId]->removeThread(this);
@ -408,7 +420,7 @@ void AudioThread::run() {
e.printMessage(); e.printMessage();
} }
} }
if (threadQueueNotify != NULL) { if (threadQueueNotify != NULL) {
DemodulatorThreadCommand tCmd(DemodulatorThreadCommand::DEMOD_THREAD_CMD_AUDIO_TERMINATED); DemodulatorThreadCommand tCmd(DemodulatorThreadCommand::DEMOD_THREAD_CMD_AUDIO_TERMINATED);
tCmd.context = this; tCmd.context = this;
@ -431,23 +443,19 @@ void AudioThread::setActive(bool state) {
AudioThreadInput *dummy; AudioThreadInput *dummy;
if (state && !active && inputQueue) { if (state && !active && inputQueue) {
deviceController[parameters.deviceId]->bindThread(this);
} else if (!state && active) {
deviceController[parameters.deviceId]->removeThread(this);
}
// Activity state changing, clear any inputs
if(inputQueue) {
while (!inputQueue->empty()) { // flush queue while (!inputQueue->empty()) { // flush queue
inputQueue->pop(dummy); inputQueue->pop(dummy);
if (dummy) { if (dummy) {
dummy->decRefCount(); dummy->decRefCount();
} }
} }
deviceController[parameters.deviceId]->bindThread(this);
} else if (!state && active) {
deviceController[parameters.deviceId]->removeThread(this);
if(inputQueue) {
while (!inputQueue->empty()) { // flush queue
inputQueue->pop(dummy);
if (dummy) {
dummy->decRefCount();
}
}
}
} }
active = state; active = state;
} }

View File

@ -212,6 +212,7 @@ void DemodulatorMgr::setActiveDemodulator(DemodulatorInstance *demod, bool tempo
#endif #endif
} else { } else {
garbageCollect(); garbageCollect();
ReBufferGC::garbageCollect();
} }
if (activeVisualDemodulator) { if (activeVisualDemodulator) {

View File

@ -271,6 +271,11 @@ void DemodulatorPreThread::run() {
} }
} }
while (!iqOutputQueue->empty()) {
DemodulatorThreadPostIQData *tmp;
iqOutputQueue->pop(tmp);
tmp->decRefCount();
}
buffers.purge(); buffers.purge();
DemodulatorThreadCommand tCmd(DemodulatorThreadCommand::DEMOD_THREAD_CMD_DEMOD_PREPROCESS_TERMINATED); DemodulatorThreadCommand tCmd(DemodulatorThreadCommand::DEMOD_THREAD_CMD_DEMOD_PREPROCESS_TERMINATED);

View File

@ -132,13 +132,11 @@ void DemodulatorThread::run() {
ati->sampleRate = cModemKit->audioSampleRate; ati->sampleRate = cModemKit->audioSampleRate;
ati->inputRate = inp->sampleRate; ati->inputRate = inp->sampleRate;
ati->setRefCount(1);
} else if (modemDigital != nullptr) { } else if (modemDigital != nullptr) {
ati = outputBuffers.getBuffer(); ati = outputBuffers.getBuffer();
ati->sampleRate = cModemKit->sampleRate; ati->sampleRate = cModemKit->sampleRate;
ati->inputRate = inp->sampleRate; ati->inputRate = inp->sampleRate;
ati->setRefCount(1);
} }
cModem->demodulate(cModemKit, &modemData, ati); cModem->demodulate(cModemKit, &modemData, ati);
@ -173,7 +171,7 @@ void DemodulatorThread::run() {
} }
} }
} else if (ati) { } else if (ati) {
ati->decRefCount(); ati->setRefCount(0);
ati = nullptr; ati = nullptr;
} }
@ -277,15 +275,25 @@ void DemodulatorThread::run() {
} }
// end while !terminated // end while !terminated
// Purge any unused inputs
while (!iqInputQueue->empty()) {
DemodulatorThreadPostIQData *ref;
iqInputQueue->pop(ref);
if (ref) { // May have other consumers; just decrement
ref->decRefCount();
}
}
while (!audioOutputQueue->empty()) {
AudioThreadInput *ref;
audioOutputQueue->pop(ref);
if (ref) { // Originated here; set RefCount to 0
ref->setRefCount(0);
}
}
outputBuffers.purge(); outputBuffers.purge();
//Guard the cleanup of audioVisOutputQueue properly. //Guard the cleanup of audioVisOutputQueue properly.
std::lock_guard < std::mutex > lock(m_mutexAudioVisOutputQueue); std::lock_guard < std::mutex > lock(m_mutexAudioVisOutputQueue);
// if (audioVisOutputQueue != nullptr && !audioVisOutputQueue->empty()) {
// AudioThreadInput *dummy_vis;
// audioVisOutputQueue->pop(dummy_vis);
// }
DemodulatorThreadCommand tCmd(DemodulatorThreadCommand::DEMOD_THREAD_CMD_DEMOD_TERMINATED); DemodulatorThreadCommand tCmd(DemodulatorThreadCommand::DEMOD_THREAD_CMD_DEMOD_TERMINATED);
tCmd.context = this; tCmd.context = this;

View File

@ -231,8 +231,9 @@ public:
/** /**
* Remove any items in the queue. * Remove any items in the queue.
*/ */
void flush() const { void flush() {
std::lock_guard < std::mutex > lock(m_mutex); std::lock_guard < std::mutex > lock(m_mutex);
m_queue = std::queue<T, Container>();
std::queue<T, Container> emptyQueue; std::queue<T, Container> emptyQueue;
std::swap(m_queue, emptyQueue); std::swap(m_queue, emptyQueue);
} }