TH_CLEAN_3.5: push() cleanup side of things, assure SDRThread::readStream() to actually check for full + make ThreadQueue notify even in case on not-successfull push(), make it spam notify_all() everytime

This commit is contained in:
vsonnier 2016-07-06 21:23:59 +02:00
parent b495b388c9
commit 21c8a81c32
13 changed files with 197 additions and 99 deletions

View File

@ -1645,6 +1645,7 @@ bool AppFrame::loadSession(std::string fileName) {
} }
wxGetApp().getDemodMgr().setActiveDemodulator(nullptr, false); wxGetApp().getDemodMgr().setActiveDemodulator(nullptr, false);
wxGetApp().getDemodMgr().terminateAll(); wxGetApp().getDemodMgr().terminateAll();
try { try {

View File

@ -254,6 +254,7 @@ bool CubicSDR::OnInit() {
sdrPostThread = new SDRPostThread(); sdrPostThread = new SDRPostThread();
sdrPostThread->setInputQueue("IQDataInput", pipeSDRIQData); sdrPostThread->setInputQueue("IQDataInput", pipeSDRIQData);
sdrPostThread->setOutputQueue("IQVisualDataOutput", pipeIQVisualData); sdrPostThread->setOutputQueue("IQVisualDataOutput", pipeIQVisualData);
sdrPostThread->setOutputQueue("IQDataOutput", pipeWaterfallIQVisualData); sdrPostThread->setOutputQueue("IQDataOutput", pipeWaterfallIQVisualData);
sdrPostThread->setOutputQueue("IQActiveDemodVisualDataOutput", pipeDemodIQVisualData); sdrPostThread->setOutputQueue("IQActiveDemodVisualDataOutput", pipeDemodIQVisualData);

View File

@ -8,6 +8,7 @@
#include <memory.h> #include <memory.h>
#include <mutex> #include <mutex>
std::map<int, AudioThread *> AudioThread::deviceController; std::map<int, AudioThread *> AudioThread::deviceController;
std::map<int, int> AudioThread::deviceSampleRate; std::map<int, int> AudioThread::deviceSampleRate;
std::map<int, std::thread *> AudioThread::deviceThread; std::map<int, std::thread *> AudioThread::deviceThread;
@ -19,15 +20,11 @@ AudioThread::AudioThread() : IOThread(),
underflowCount.store(0); underflowCount.store(0);
active.store(false); active.store(false);
outputDevice.store(-1); outputDevice.store(-1);
gain.store(1.0); gain = 1.0;
vBoundThreads = new std::vector<AudioThread *>;
boundThreads.store(vBoundThreads);
} }
AudioThread::~AudioThread() { AudioThread::~AudioThread() {
boundThreads.store(nullptr);
delete vBoundThreads;
} }
std::recursive_mutex & AudioThread::getMutex() std::recursive_mutex & AudioThread::getMutex()
@ -39,8 +36,8 @@ void AudioThread::bindThread(AudioThread *other) {
std::lock_guard<std::recursive_mutex> lock(m_mutex); std::lock_guard<std::recursive_mutex> lock(m_mutex);
if (std::find(boundThreads.load()->begin(), boundThreads.load()->end(), other) == boundThreads.load()->end()) { if (std::find(boundThreads.begin(), boundThreads.end(), other) == boundThreads.end()) {
boundThreads.load()->push_back(other); boundThreads.push_back(other);
} }
} }
@ -49,9 +46,9 @@ void AudioThread::removeThread(AudioThread *other) {
std::lock_guard<std::recursive_mutex> lock(m_mutex); std::lock_guard<std::recursive_mutex> lock(m_mutex);
std::vector<AudioThread *>::iterator i; std::vector<AudioThread *>::iterator i;
i = std::find(boundThreads.load()->begin(), boundThreads.load()->end(), other); i = std::find(boundThreads.begin(), boundThreads.end(), other);
if (i != boundThreads.load()->end()) { if (i != boundThreads.end()) {
boundThreads.load()->erase(i); boundThreads.erase(i);
} }
} }
@ -85,7 +82,7 @@ static int audioCallback(void *outputBuffer, void * /* inputBuffer */, unsigned
std::cout << "Audio buffer underflow.." << (src->underflowCount++) << std::endl; std::cout << "Audio buffer underflow.." << (src->underflowCount++) << std::endl;
} }
if (src->boundThreads.load()->empty()) { if (src->boundThreads.empty()) {
return 0; return 0;
} }
@ -93,9 +90,9 @@ static int audioCallback(void *outputBuffer, void * /* inputBuffer */, unsigned
double peak = 0.0; double peak = 0.0;
//for all boundThreads //for all boundThreads
for (size_t j = 0; j < src->boundThreads.load()->size(); j++) { for (size_t j = 0; j < src->boundThreads.size(); j++) {
AudioThread *srcmix = (*(src->boundThreads.load()))[j]; AudioThread *srcmix = src->boundThreads[j];
//lock every single boundThread srcmix in succession the time we process //lock every single boundThread srcmix in succession the time we process
//its audio samples. //its audio samples.
@ -276,6 +273,7 @@ void AudioThread::enumerateDevices(std::vector<RtAudio::DeviceInfo> &devs) {
} }
void AudioThread::setDeviceSampleRate(int deviceId, int sampleRate) { void AudioThread::setDeviceSampleRate(int deviceId, int sampleRate) {
if (deviceController.find(deviceId) != deviceController.end()) { if (deviceController.find(deviceId) != deviceController.end()) {
AudioThreadCommand refreshDevice; AudioThreadCommand refreshDevice;
@ -286,16 +284,17 @@ void AudioThread::setDeviceSampleRate(int deviceId, int sampleRate) {
} }
void AudioThread::setSampleRate(int sampleRate) { void AudioThread::setSampleRate(int sampleRate) {
std::lock_guard<std::recursive_mutex> lock(m_mutex);
if (deviceController[outputDevice.load()] == this) { if (deviceController[outputDevice.load()] == this) {
deviceSampleRate[outputDevice.load()] = sampleRate; deviceSampleRate[outputDevice.load()] = sampleRate;
dac.stopStream(); dac.stopStream();
dac.closeStream(); dac.closeStream();
std::lock_guard<std::recursive_mutex> lock(m_mutex);
for (size_t j = 0; j < boundThreads.load()->size(); j++) { for (size_t j = 0; j < boundThreads.size(); j++) {
AudioThread *srcmix = (*(boundThreads.load()))[j]; AudioThread *srcmix = boundThreads[j];
srcmix->setSampleRate(sampleRate); srcmix->setSampleRate(sampleRate);
} }
@ -318,10 +317,15 @@ void AudioThread::setSampleRate(int sampleRate) {
} }
int AudioThread::getSampleRate() { int AudioThread::getSampleRate() {
std::lock_guard<std::recursive_mutex> lock(m_mutex);
return this->sampleRate; return this->sampleRate;
} }
void AudioThread::setupDevice(int deviceId) { void AudioThread::setupDevice(int deviceId) {
std::lock_guard<std::recursive_mutex> lock(m_mutex);
parameters.deviceId = deviceId; parameters.deviceId = deviceId;
parameters.nChannels = 2; parameters.nChannels = 2;
parameters.firstChannel = 0; parameters.firstChannel = 0;
@ -373,6 +377,8 @@ void AudioThread::setupDevice(int deviceId) {
} }
int AudioThread::getOutputDevice() { int AudioThread::getOutputDevice() {
std::lock_guard<std::recursive_mutex> lock(m_mutex);
if (outputDevice == -1) { if (outputDevice == -1) {
return dac.getDefaultOutputDevice(); return dac.getDefaultOutputDevice();
} }
@ -380,6 +386,9 @@ int AudioThread::getOutputDevice() {
} }
void AudioThread::setInitOutputDevice(int deviceId, int sampleRate) { void AudioThread::setInitOutputDevice(int deviceId, int sampleRate) {
std::lock_guard<std::recursive_mutex> lock(m_mutex);
outputDevice = deviceId; outputDevice = deviceId;
if (sampleRate == -1) { if (sampleRate == -1) {
if (deviceSampleRate.find(parameters.deviceId) != deviceSampleRate.end()) { if (deviceSampleRate.find(parameters.deviceId) != deviceSampleRate.end()) {
@ -415,6 +424,7 @@ void AudioThread::run() {
//Infinite loop, witing for commands or for termination //Infinite loop, witing for commands or for termination
while (!stopping) { while (!stopping) {
AudioThreadCommand command; AudioThreadCommand command;
cmdQueue.pop(command); cmdQueue.pop(command);
if (command.cmd == AudioThreadCommand::AUDIO_THREAD_CMD_SET_DEVICE) { if (command.cmd == AudioThreadCommand::AUDIO_THREAD_CMD_SET_DEVICE) {
@ -470,10 +480,14 @@ void AudioThread::terminate() {
} }
bool AudioThread::isActive() { bool AudioThread::isActive() {
std::lock_guard<std::recursive_mutex> lock(m_mutex);
return active; return active;
} }
void AudioThread::setActive(bool state) { void AudioThread::setActive(bool state) {
std::lock_guard<std::recursive_mutex> lock(m_mutex);
AudioThreadInput *dummy; AudioThreadInput *dummy;
if (state && !active && inputQueue) { if (state && !active && inputQueue) {
@ -500,6 +514,9 @@ AudioThreadCommandQueue *AudioThread::getCommandQueue() {
} }
void AudioThread::setGain(float gain_in) { void AudioThread::setGain(float gain_in) {
std::lock_guard<std::recursive_mutex> lock(m_mutex);
if (gain < 0.0) { if (gain < 0.0) {
gain = 0.0; gain = 0.0;
} }
@ -510,5 +527,8 @@ void AudioThread::setGain(float gain_in) {
} }
float AudioThread::getGain() { float AudioThread::getGain() {
std::lock_guard<std::recursive_mutex> lock(m_mutex);
return gain; return gain;
} }

View File

@ -57,7 +57,7 @@ public:
std::atomic_bool initialized; std::atomic_bool initialized;
std::atomic_bool active; std::atomic_bool active;
std::atomic_int outputDevice; std::atomic_int outputDevice;
std::atomic<float> gain; float gain;
AudioThread(); AudioThread();
~AudioThread(); ~AudioThread();
@ -88,7 +88,7 @@ private:
AudioThreadCommandQueue cmdQueue; AudioThreadCommandQueue cmdQueue;
int sampleRate; int sampleRate;
//The own m_mutex protecting this AudioThread //The own m_mutex protecting this AudioThread, in particular boundThreads
std::recursive_mutex m_mutex; std::recursive_mutex m_mutex;
public: public:
@ -103,7 +103,8 @@ public:
static std::map<int,std::thread *> deviceThread; static std::map<int,std::thread *> deviceThread;
static void deviceCleanup(); static void deviceCleanup();
static void setDeviceSampleRate(int deviceId, int sampleRate); static void setDeviceSampleRate(int deviceId, int sampleRate);
std::atomic<std::vector<AudioThread *> *> boundThreads;
std::vector<AudioThread *> *vBoundThreads; //protected by m_mutex
std::vector<AudioThread *> boundThreads;
}; };

View File

@ -152,7 +152,6 @@ bool DemodulatorInstance::isTerminated() {
bool demodTerminated = demodulatorThread->isTerminated(); bool demodTerminated = demodulatorThread->isTerminated();
bool preDemodTerminated = demodulatorPreThread->isTerminated(); bool preDemodTerminated = demodulatorPreThread->isTerminated();
//Cleanup the worker threads, if the threads are indeed terminated //Cleanup the worker threads, if the threads are indeed terminated
if (audioTerminated) { if (audioTerminated) {
@ -168,7 +167,6 @@ bool DemodulatorInstance::isTerminated() {
if (demodTerminated) { if (demodTerminated) {
if (t_Demod) { if (t_Demod) {
#ifdef __APPLE__ #ifdef __APPLE__
pthread_join(t_Demod, nullptr); pthread_join(t_Demod, nullptr);
#else #else
@ -185,8 +183,8 @@ bool DemodulatorInstance::isTerminated() {
} }
if (preDemodTerminated) { if (preDemodTerminated) {
if (t_PreDemod) { if (t_PreDemod) {
#ifdef __APPLE__ #ifdef __APPLE__
pthread_join(t_PreDemod, NULL); pthread_join(t_PreDemod, NULL);
@ -195,10 +193,9 @@ bool DemodulatorInstance::isTerminated() {
delete t_PreDemod; delete t_PreDemod;
#endif #endif
t_PreDemod = nullptr; t_PreDemod = nullptr;
} }
} }
bool terminated = audioTerminated && demodTerminated && preDemodTerminated; bool terminated = audioTerminated && demodTerminated && preDemodTerminated;
return terminated; return terminated;

View File

@ -136,13 +136,13 @@ void DemodulatorMgr::deleteThread(DemodulatorInstance *demod) {
i = std::find(demods.begin(), demods.end(), demod); i = std::find(demods.begin(), demods.end(), demod);
if (activeDemodulator == demod) { if (activeDemodulator == demod) {
activeDemodulator = NULL; activeDemodulator = nullptr;
} }
if (lastActiveDemodulator == demod) { if (lastActiveDemodulator == demod) {
lastActiveDemodulator = NULL; lastActiveDemodulator = nullptr;
} }
if (activeVisualDemodulator == demod) { if (activeVisualDemodulator == demod) {
activeVisualDemodulator = NULL; activeVisualDemodulator = nullptr;
} }
if (i != demods.end()) { if (i != demods.end()) {
@ -150,6 +150,7 @@ void DemodulatorMgr::deleteThread(DemodulatorInstance *demod) {
} }
//Ask for termination //Ask for termination
demod->setActive(false);
demod->terminate(); demod->terminate();
//Do not cleanup immediatly //Do not cleanup immediatly
@ -200,27 +201,28 @@ bool DemodulatorMgr::anyDemodulatorsAt(long long freq, int bandwidth) {
void DemodulatorMgr::setActiveDemodulator(DemodulatorInstance *demod, bool temporary) { void DemodulatorMgr::setActiveDemodulator(DemodulatorInstance *demod, bool temporary) {
std::lock_guard < std::recursive_mutex > lock(demods_busy);
if (!temporary) { if (!temporary) {
if (activeDemodulator != NULL) { if (activeDemodulator.load() != nullptr) {
lastActiveDemodulator = activeDemodulator; lastActiveDemodulator = activeDemodulator.load();
updateLastState(); updateLastState();
} else { } else {
lastActiveDemodulator = demod; lastActiveDemodulator = demod;
} }
updateLastState(); updateLastState();
#if USE_HAMLIB #if USE_HAMLIB
if (wxGetApp().rigIsActive() && wxGetApp().getRigThread()->getFollowModem() && lastActiveDemodulator) { if (wxGetApp().rigIsActive() && wxGetApp().getRigThread()->getFollowModem() && lastActiveDemodulator.load()) {
wxGetApp().getRigThread()->setFrequency(lastActiveDemodulator->getFrequency(),true); wxGetApp().getRigThread()->setFrequency(lastActiveDemodulator.load()->getFrequency(),true);
} }
#endif #endif
} else { } else {
std::lock_guard < std::recursive_mutex > lock(demods_busy);
garbageCollect(); garbageCollect();
ReBufferGC::garbageCollect(); ReBufferGC::garbageCollect();
} }
if (activeVisualDemodulator) { if (activeVisualDemodulator.load()) {
activeVisualDemodulator->setVisualOutputQueue(NULL); activeVisualDemodulator.load()->setVisualOutputQueue(nullptr);
} }
if (demod) { if (demod) {
demod->setVisualOutputQueue(wxGetApp().getAudioVisualQueue()); demod->setVisualOutputQueue(wxGetApp().getAudioVisualQueue());
@ -238,7 +240,7 @@ void DemodulatorMgr::setActiveDemodulator(DemodulatorInstance *demod, bool tempo
} }
DemodulatorInstance *DemodulatorMgr::getActiveDemodulator() { DemodulatorInstance *DemodulatorMgr::getActiveDemodulator() {
if (activeDemodulator && !activeDemodulator->isActive()) { if (activeDemodulator.load() && !activeDemodulator.load()->isActive()) {
activeDemodulator = getLastActiveDemodulator(); activeDemodulator = getLastActiveDemodulator();
} }
return activeDemodulator; return activeDemodulator;
@ -262,8 +264,6 @@ void DemodulatorMgr::garbageCollect() {
std::cout << "Garbage collected demodulator instance " << deleted->getLabel() << std::endl; std::cout << "Garbage collected demodulator instance " << deleted->getLabel() << std::endl;
delete deleted; delete deleted;
return; return;
} }
} }
@ -273,27 +273,28 @@ void DemodulatorMgr::garbageCollect() {
void DemodulatorMgr::updateLastState() { void DemodulatorMgr::updateLastState() {
std::lock_guard < std::recursive_mutex > lock(demods_busy); std::lock_guard < std::recursive_mutex > lock(demods_busy);
if (std::find(demods.begin(), demods.end(), lastActiveDemodulator) == demods.end()) { if (std::find(demods.begin(), demods.end(), lastActiveDemodulator) == demods.end()) {
if (activeDemodulator && activeDemodulator->isActive()) { if (activeDemodulator.load() && activeDemodulator.load()->isActive()) {
lastActiveDemodulator = activeDemodulator; lastActiveDemodulator = activeDemodulator.load();
} else if (activeDemodulator && !activeDemodulator->isActive()){ } else if (activeDemodulator.load() && !activeDemodulator.load()->isActive()){
activeDemodulator = NULL; activeDemodulator = nullptr;
lastActiveDemodulator = NULL; lastActiveDemodulator = nullptr;
} }
} }
if (lastActiveDemodulator && !lastActiveDemodulator->isActive()) { if (lastActiveDemodulator.load() && !lastActiveDemodulator.load()->isActive()) {
lastActiveDemodulator = NULL; lastActiveDemodulator = nullptr;
} }
if (lastActiveDemodulator) { if (lastActiveDemodulator.load()) {
lastBandwidth = lastActiveDemodulator->getBandwidth(); lastBandwidth = lastActiveDemodulator.load()->getBandwidth();
lastDemodType = lastActiveDemodulator->getDemodulatorType(); lastDemodType = lastActiveDemodulator.load()->getDemodulatorType();
lastDemodLock = lastActiveDemodulator->getDemodulatorLock()?true:false; lastDemodLock = lastActiveDemodulator.load()->getDemodulatorLock()?true:false;
lastSquelchEnabled = lastActiveDemodulator->isSquelchEnabled(); lastSquelchEnabled = lastActiveDemodulator.load()->isSquelchEnabled();
lastSquelch = lastActiveDemodulator->getSquelchLevel(); lastSquelch = lastActiveDemodulator.load()->getSquelchLevel();
lastGain = lastActiveDemodulator->getGain(); lastGain = lastActiveDemodulator.load()->getGain();
lastModemSettings[lastDemodType] = lastActiveDemodulator->readModemSettings(); lastModemSettings[lastDemodType] = lastActiveDemodulator.load()->readModemSettings();
} }
} }

View File

@ -53,15 +53,17 @@ public:
void setLastModemSettings(std::string, ModemSettings); void setLastModemSettings(std::string, ModemSettings);
void updateLastState(); void updateLastState();
private: private:
void garbageCollect(); void garbageCollect();
std::vector<DemodulatorInstance *> demods; std::vector<DemodulatorInstance *> demods;
std::vector<DemodulatorInstance *> demods_deleted; std::vector<DemodulatorInstance *> demods_deleted;
DemodulatorInstance *activeDemodulator;
DemodulatorInstance *lastActiveDemodulator; std::atomic<DemodulatorInstance *> activeDemodulator;
DemodulatorInstance *activeVisualDemodulator; std::atomic<DemodulatorInstance *> lastActiveDemodulator;
std::atomic<DemodulatorInstance *> activeVisualDemodulator;
int lastBandwidth; int lastBandwidth;
std::string lastDemodType; std::string lastDemodType;

View File

@ -66,6 +66,7 @@ void DemodulatorPreThread::run() {
while (!stopping) { while (!stopping) {
DemodulatorThreadIQData *inp; DemodulatorThreadIQData *inp;
iqInputQueue->pop(inp); iqInputQueue->pop(inp);
if (frequencyChanged.load()) { if (frequencyChanged.load()) {
@ -205,7 +206,11 @@ void DemodulatorPreThread::run() {
resamp->modemKit = cModemKit; resamp->modemKit = cModemKit;
resamp->sampleRate = currentBandwidth; resamp->sampleRate = currentBandwidth;
iqOutputQueue->push(resamp); if (!iqOutputQueue->push(resamp)) {
resamp->setRefCount(0);
std::cout << "DemodulatorPreThread::run() cannot push resamp into iqOutputQueue, is full !" << std::endl;
std::this_thread::yield();
}
} }
inp->decRefCount(); inp->decRefCount();
@ -335,7 +340,10 @@ int DemodulatorPreThread::getAudioSampleRate() {
void DemodulatorPreThread::terminate() { void DemodulatorPreThread::terminate() {
IOThread::terminate(); IOThread::terminate();
DemodulatorThreadIQData *inp = new DemodulatorThreadIQData; // push dummy to nudge queue DemodulatorThreadIQData *inp = new DemodulatorThreadIQData; // push dummy to nudge queue
iqInputQueue->push(inp); if (!iqInputQueue->push(inp)) {
delete inp;
}
DemodulatorWorkerThreadCommand command(DemodulatorWorkerThreadCommand::DEMOD_WORKER_THREAD_CMD_NULL); DemodulatorWorkerThreadCommand command(DemodulatorWorkerThreadCommand::DEMOD_WORKER_THREAD_CMD_NULL);
workerQueue->push(command); workerQueue->push(command);

View File

@ -74,6 +74,7 @@ void DemodulatorThread::run() {
while (!stopping) { while (!stopping) {
DemodulatorThreadPostIQData *inp; DemodulatorThreadPostIQData *inp;
iqInputQueue->pop(inp); iqInputQueue->pop(inp);
// std::lock_guard < std::mutex > lock(inp->m_mutex); // std::lock_guard < std::mutex > lock(inp->m_mutex);
@ -238,13 +239,23 @@ void DemodulatorThread::run() {
ati_vis->type = 0; ati_vis->type = 0;
} }
localAudioVisOutputQueue->push(ati_vis); if (!localAudioVisOutputQueue->push(ati_vis)) {
ati_vis->setRefCount(0);
std::cout << "DemodulatorThread::run() cannot push ati_vis into localAudioVisOutputQueue, is full !" << std::endl;
std::this_thread::yield();
}
} }
if (ati != nullptr) { if (ati != nullptr) {
if (!muted.load() && (!wxGetApp().getSoloMode() || (demodInstance == wxGetApp().getDemodMgr().getLastActiveDemodulator()))) { if (!muted.load() && (!wxGetApp().getSoloMode() || (demodInstance == wxGetApp().getDemodMgr().getLastActiveDemodulator()))) {
audioOutputQueue->push(ati);
if (!audioOutputQueue->push(ati)) {
ati->decRefCount();
std::cout << "DemodulatorThread::run() cannot push ati into audioOutputQueue, is full !" << std::endl;
std::this_thread::yield();
}
} else { } else {
ati->setRefCount(0); ati->setRefCount(0);
} }
@ -297,7 +308,9 @@ void DemodulatorThread::run() {
void DemodulatorThread::terminate() { void DemodulatorThread::terminate() {
IOThread::terminate(); IOThread::terminate();
DemodulatorThreadPostIQData *inp = new DemodulatorThreadPostIQData; // push dummy to nudge queue DemodulatorThreadPostIQData *inp = new DemodulatorThreadPostIQData; // push dummy to nudge queue
iqInputQueue->push(inp); if (!iqInputQueue->push(inp)) {
delete inp;
}
} }
bool DemodulatorThread::isMuted() { bool DemodulatorThread::isMuted() {

View File

@ -86,11 +86,10 @@ protected:
output->setRefCount(outputs.size()); output->setRefCount(outputs.size());
for (outputs_i = outputs.begin(); outputs_i != outputs.end(); outputs_i++) { for (outputs_i = outputs.begin(); outputs_i != outputs.end(); outputs_i++) {
if ((*outputs_i)->full()) {
if (!(*outputs_i)->push(output)) {
output->decRefCount(); output->decRefCount();
} else { }
(*outputs_i)->push(output);
}
} }
} }
@ -107,12 +106,16 @@ template<class OutputDataType = ReferenceCounter>
class VisualDataDistributor : public VisualProcessor<OutputDataType, OutputDataType> { class VisualDataDistributor : public VisualProcessor<OutputDataType, OutputDataType> {
protected: protected:
void process() { void process() {
while (!VisualProcessor<OutputDataType, OutputDataType>::input->empty()) { OutputDataType *inp;
while (VisualProcessor<OutputDataType, OutputDataType>::input->try_pop(inp)) {
if (!VisualProcessor<OutputDataType, OutputDataType>::isAnyOutputEmpty()) { if (!VisualProcessor<OutputDataType, OutputDataType>::isAnyOutputEmpty()) {
if (inp) {
inp->decRefCount();
}
return; return;
} }
OutputDataType *inp;
VisualProcessor<OutputDataType, OutputDataType>::input->pop(inp);
if (inp) { if (inp) {
VisualProcessor<OutputDataType, OutputDataType>::distribute(inp); VisualProcessor<OutputDataType, OutputDataType>::distribute(inp);
} }
@ -125,12 +128,15 @@ template<class OutputDataType = ReferenceCounter>
class VisualDataReDistributor : public VisualProcessor<OutputDataType, OutputDataType> { class VisualDataReDistributor : public VisualProcessor<OutputDataType, OutputDataType> {
protected: protected:
void process() { void process() {
while (!VisualProcessor<OutputDataType, OutputDataType>::input->empty()) { OutputDataType *inp;
while (VisualProcessor<OutputDataType, OutputDataType>::input->try_pop(inp)) {
if (!VisualProcessor<OutputDataType, OutputDataType>::isAnyOutputEmpty()) { if (!VisualProcessor<OutputDataType, OutputDataType>::isAnyOutputEmpty()) {
if (inp) {
inp->decRefCount();
}
return; return;
} }
OutputDataType *inp;
VisualProcessor<OutputDataType, OutputDataType>::input->pop(inp);
if (inp) { if (inp) {
OutputDataType *outp = buffers.getBuffer(); OutputDataType *outp = buffers.getBuffer();

View File

@ -86,7 +86,7 @@ void SDRPostThread::updateActiveDemodulators() {
nRunDemods = 0; nRunDemods = 0;
long long centerFreq = wxGetApp().getFrequency(); long long centerFreq = wxGetApp().getFrequency();
for (demod_i = demodulators.begin(); demod_i != demodulators.end(); demod_i++) { for (demod_i = demodulators.begin(); demod_i != demodulators.end(); demod_i++) {
DemodulatorInstance *demod = *demod_i; DemodulatorInstance *demod = *demod_i;
DemodulatorThreadInputQueue *demodQueue = demod->getIQInputDataPipe(); DemodulatorThreadInputQueue *demodQueue = demod->getIQInputDataPipe();
@ -108,7 +108,9 @@ void SDRPostThread::updateActiveDemodulators() {
DemodulatorThreadIQData *dummyDataOut = new DemodulatorThreadIQData; DemodulatorThreadIQData *dummyDataOut = new DemodulatorThreadIQData;
dummyDataOut->frequency = frequency; dummyDataOut->frequency = frequency;
dummyDataOut->sampleRate = sampleRate; dummyDataOut->sampleRate = sampleRate;
demodQueue->push(dummyDataOut); if (!demodQueue->push(dummyDataOut)) {
delete dummyDataOut;
}
} }
// follow if follow mode // follow if follow mode
@ -119,6 +121,7 @@ void SDRPostThread::updateActiveDemodulators() {
} else if (!demod->isActive()) { // in range, activate if not activated } else if (!demod->isActive()) { // in range, activate if not activated
demod->setActive(true); demod->setActive(true);
if (wxGetApp().getDemodMgr().getLastActiveDemodulator() == NULL) { if (wxGetApp().getDemodMgr().getLastActiveDemodulator() == NULL) {
wxGetApp().getDemodMgr().setActiveDemodulator(demod); wxGetApp().getDemodMgr().setActiveDemodulator(demod);
} }
} }
@ -181,8 +184,6 @@ void SDRPostThread::run() {
iqDataOutQueue = static_cast<DemodulatorThreadInputQueue*>(getOutputQueue("IQDataOutput")); iqDataOutQueue = static_cast<DemodulatorThreadInputQueue*>(getOutputQueue("IQDataOutput"));
iqVisualQueue = static_cast<DemodulatorThreadInputQueue*>(getOutputQueue("IQVisualDataOutput")); iqVisualQueue = static_cast<DemodulatorThreadInputQueue*>(getOutputQueue("IQVisualDataOutput"));
iqActiveDemodVisualQueue = static_cast<DemodulatorThreadInputQueue*>(getOutputQueue("IQActiveDemodVisualDataOutput")); iqActiveDemodVisualQueue = static_cast<DemodulatorThreadInputQueue*>(getOutputQueue("IQActiveDemodVisualDataOutput"));
iqDataInQueue->set_max_num_items(0);
while (!stopping) { while (!stopping) {
SDRThreadIQData *data_in; SDRThreadIQData *data_in;
@ -212,17 +213,16 @@ void SDRPostThread::run() {
} }
} }
//Only update the list of demodulators here
if (doUpdate) { if (doUpdate) {
updateActiveDemodulators(); updateActiveDemodulators();
} }
} //end while } //end while
//TODO: Why only 1 element was removed before ? //Be safe, remove as many elements as possible
DemodulatorThreadIQData *visualDataDummy; DemodulatorThreadIQData *visualDataDummy;
while (iqVisualQueue && iqVisualQueue->try_pop(visualDataDummy)) { while (iqVisualQueue && iqVisualQueue->try_pop(visualDataDummy)) {
//nothing visualDataDummy->decRefCount();
//TODO: What about the refcounts ?
} }
// buffers.purge(); // buffers.purge();
@ -234,7 +234,9 @@ void SDRPostThread::run() {
void SDRPostThread::terminate() { void SDRPostThread::terminate() {
IOThread::terminate(); IOThread::terminate();
SDRThreadIQData *dummy = new SDRThreadIQData; SDRThreadIQData *dummy = new SDRThreadIQData;
iqDataInQueue->push(dummy); if (!iqDataInQueue->push(dummy)) {
delete dummy;
}
} }
void SDRPostThread::runSingleCH(SDRThreadIQData *data_in) { void SDRPostThread::runSingleCH(SDRThreadIQData *data_in) {
@ -295,19 +297,34 @@ void SDRPostThread::runSingleCH(SDRThreadIQData *data_in) {
iirfilt_crcf_execute_block(dcFilter, &data_in->data[0], dataSize, &demodDataOut->data[0]); iirfilt_crcf_execute_block(dcFilter, &data_in->data[0], dataSize, &demodDataOut->data[0]);
if (doDemodVisOut) { if (doDemodVisOut) {
iqActiveDemodVisualQueue->push(demodDataOut); if (!iqActiveDemodVisualQueue->push(demodDataOut)) {
demodDataOut->decRefCount();
std::cout << "SDRPostThread::runSingleCH() cannot push demodDataOut into iqActiveDemodVisualQueue, is full !" << std::endl;
std::this_thread::yield();
}
} }
if (doIQDataOut) { if (doIQDataOut) {
iqDataOutQueue->push(demodDataOut); if (!iqDataOutQueue->push(demodDataOut)) {
demodDataOut->decRefCount();
std::cout << "SDRPostThread::runSingleCH() cannot push demodDataOut into iqDataOutQueue, is full !" << std::endl;
std::this_thread::yield();
}
} }
if (doVisOut) { if (doVisOut) {
iqVisualQueue->push(demodDataOut); if (!iqVisualQueue->push(demodDataOut)) {
demodDataOut->decRefCount();
std::cout << "SDRPostThread::runSingleCH() cannot push demodDataOut into iqVisualQueue, is full !" << std::endl;
std::this_thread::yield();
}
} }
for (size_t i = 0; i < nRunDemods; i++) { for (size_t i = 0; i < nRunDemods; i++) {
runDemods[i]->getIQInputDataPipe()->push(demodDataOut); if (!runDemods[i]->getIQInputDataPipe()->push(demodDataOut)) {
demodDataOut->decRefCount();
std::this_thread::yield();
}
} }
} }
} }
@ -345,9 +362,19 @@ void SDRPostThread::runPFBCH(SDRThreadIQData *data_in) {
iqDataOut->sampleRate = data_in->sampleRate; iqDataOut->sampleRate = data_in->sampleRate;
iqDataOut->data.assign(data_in->data.begin(), data_in->data.begin() + dataSize); iqDataOut->data.assign(data_in->data.begin(), data_in->data.begin() + dataSize);
iqDataOutQueue->push(iqDataOut); if (!iqDataOutQueue->push(iqDataOut)) {
if (doVis) { std::cout << "SDRPostThread::runPFBCH() cannot push iqDataOut into iqDataOutQueue, is full !" << std::endl;
iqVisualQueue->push(iqDataOut); iqDataOut->decRefCount();
std::this_thread::yield();
}
if (doVis) {
if (!iqVisualQueue->push(iqDataOut)) {
std::cout << "SDRPostThread::runPFBCH() cannot push iqDataOut into iqVisualQueue, is full !" << std::endl;
iqDataOut->decRefCount();
std::this_thread::yield();
}
} }
} }
@ -443,13 +470,21 @@ void SDRPostThread::runPFBCH(SDRThreadIQData *data_in) {
} }
if (doDemodVis) { if (doDemodVis) {
iqActiveDemodVisualQueue->push(demodDataOut); if (!iqActiveDemodVisualQueue->push(demodDataOut)) {
std::cout << "SDRPostThread::runPFBCH() cannot push demodDataOut into iqActiveDemodVisualQueue, is full !" << std::endl;
demodDataOut->decRefCount();
std::this_thread::yield();
}
} }
for (size_t j = 0; j < nRunDemods; j++) { for (size_t j = 0; j < nRunDemods; j++) {
if (demodChannel[j] == i) { if (demodChannel[j] == i) {
DemodulatorInstance *demod = runDemods[j]; DemodulatorInstance *demod = runDemods[j];
demod->getIQInputDataPipe()->push(demodDataOut);
if (!demod->getIQInputDataPipe()->push(demodDataOut)) {
demodDataOut->decRefCount();
std::this_thread::yield();
}
} }
} }
} }

View File

@ -204,7 +204,7 @@ void SDRThread::readStream(SDRThreadIQDataQueue* iqDataOutQueue) {
} }
} }
if (n_read > 0 && !stopping) { if (n_read > 0 && !stopping && !iqDataOutQueue->full()) {
SDRThreadIQData *dataOut = buffers.getBuffer(); SDRThreadIQData *dataOut = buffers.getBuffer();
if (iq_swap.load()) { if (iq_swap.load()) {
@ -222,7 +222,16 @@ void SDRThread::readStream(SDRThreadIQDataQueue* iqDataOutQueue) {
dataOut->dcCorrected = hasHardwareDC.load(); dataOut->dcCorrected = hasHardwareDC.load();
dataOut->numChannels = numChannels.load(); dataOut->numChannels = numChannels.load();
iqDataOutQueue->push(dataOut); if (!iqDataOutQueue->push(dataOut)) {
//The rest of the system saturates,
//finally the push didn't suceeded, recycle dataOut immediatly.
dataOut->setRefCount(0);
std::cout << "SDRThread::readStream(): iqDataOutQueue output queue is full, discard processing ! " << std::endl;
//saturation, let a chance to the other threads to consume the existing samples
std::this_thread::yield();
}
} }
} }

View File

@ -68,11 +68,13 @@ public:
bool push(const value_type& item) { bool push(const value_type& item) {
std::lock_guard < std::mutex > lock(m_mutex); std::lock_guard < std::mutex > lock(m_mutex);
if (m_max_num_items.load() > 0 && m_queue.size() > m_max_num_items.load()) if (m_max_num_items.load() > 0 && m_queue.size() > m_max_num_items.load()) {
m_condition.notify_all();
return false; return false;
}
m_queue.push(item); m_queue.push(item);
m_condition.notify_one(); m_condition.notify_all();
return true; return true;
} }
@ -84,11 +86,13 @@ public:
bool push(const value_type&& item) { bool push(const value_type&& item) {
std::lock_guard < std::mutex > lock(m_mutex); std::lock_guard < std::mutex > lock(m_mutex);
if (m_max_num_items.load() > 0 && m_queue.size() > m_max_num_items.load()) if (m_max_num_items.load() > 0 && m_queue.size() > m_max_num_items.load()) {
m_condition.notify_all();
return false; return false;
}
m_queue.push(item); m_queue.push(item);
m_condition.notify_one(); m_condition.notify_all();
return true; return true;
} }