mirror of
https://github.com/cjcliffe/CubicSDR.git
synced 2025-09-06 23:27:53 -04:00
Merge pull request #398 from vsonnier/thread_clean_3
thread_clean_3: Multithreading AudioThread hardening, use of try_pop() and more.
This commit is contained in:
commit
4d2ea8d08a
@ -806,11 +806,11 @@ void AppFrame::OnMenu(wxCommandEvent& event) {
|
|||||||
#endif
|
#endif
|
||||||
else if (event.GetId() == wxID_SDR_START_STOP) {
|
else if (event.GetId() == wxID_SDR_START_STOP) {
|
||||||
if (!wxGetApp().getSDRThread()->isTerminated()) {
|
if (!wxGetApp().getSDRThread()->isTerminated()) {
|
||||||
wxGetApp().stopDevice(true);
|
wxGetApp().stopDevice(true, 2000);
|
||||||
} else {
|
} else {
|
||||||
SDRDeviceInfo *dev = wxGetApp().getDevice();
|
SDRDeviceInfo *dev = wxGetApp().getDevice();
|
||||||
if (dev != nullptr) {
|
if (dev != nullptr) {
|
||||||
wxGetApp().setDevice(dev);
|
wxGetApp().setDevice(dev, 0);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else if (event.GetId() == wxID_LOW_PERF) {
|
} else if (event.GetId() == wxID_LOW_PERF) {
|
||||||
@ -1645,6 +1645,7 @@ bool AppFrame::loadSession(std::string fileName) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
wxGetApp().getDemodMgr().setActiveDemodulator(nullptr, false);
|
wxGetApp().getDemodMgr().setActiveDemodulator(nullptr, false);
|
||||||
|
|
||||||
wxGetApp().getDemodMgr().terminateAll();
|
wxGetApp().getDemodMgr().terminateAll();
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
@ -133,8 +133,8 @@ long long strToFrequency(std::string freqStr) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
CubicSDR::CubicSDR() : appframe(NULL), m_glContext(NULL), frequency(0), offset(0), ppm(0), snap(1), sampleRate(DEFAULT_SAMPLE_RATE),
|
CubicSDR::CubicSDR() : frequency(0), offset(0), ppm(0), snap(1), sampleRate(DEFAULT_SAMPLE_RATE),agcMode(false)
|
||||||
sdrThread(NULL), sdrPostThread(NULL), spectrumVisualThread(NULL), demodVisualThread(NULL), pipeSDRIQData(NULL), pipeIQVisualData(NULL), pipeAudioVisualData(NULL), t_SDR(NULL), t_PostSDR(NULL) {
|
{
|
||||||
sampleRateInitialized.store(false);
|
sampleRateInitialized.store(false);
|
||||||
agcMode.store(true);
|
agcMode.store(true);
|
||||||
soloMode.store(false);
|
soloMode.store(false);
|
||||||
@ -254,6 +254,7 @@ bool CubicSDR::OnInit() {
|
|||||||
|
|
||||||
sdrPostThread = new SDRPostThread();
|
sdrPostThread = new SDRPostThread();
|
||||||
sdrPostThread->setInputQueue("IQDataInput", pipeSDRIQData);
|
sdrPostThread->setInputQueue("IQDataInput", pipeSDRIQData);
|
||||||
|
|
||||||
sdrPostThread->setOutputQueue("IQVisualDataOutput", pipeIQVisualData);
|
sdrPostThread->setOutputQueue("IQVisualDataOutput", pipeIQVisualData);
|
||||||
sdrPostThread->setOutputQueue("IQDataOutput", pipeWaterfallIQVisualData);
|
sdrPostThread->setOutputQueue("IQDataOutput", pipeWaterfallIQVisualData);
|
||||||
sdrPostThread->setOutputQueue("IQActiveDemodVisualDataOutput", pipeDemodIQVisualData);
|
sdrPostThread->setOutputQueue("IQActiveDemodVisualDataOutput", pipeDemodIQVisualData);
|
||||||
@ -289,22 +290,24 @@ int CubicSDR::OnExit() {
|
|||||||
stopRig();
|
stopRig();
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
demodMgr.terminateAll();
|
//The thread feeding them all should be terminated first, so:
|
||||||
|
|
||||||
std::cout << "Terminating SDR thread.." << std::endl;
|
std::cout << "Terminating SDR thread.." << std::endl;
|
||||||
sdrThread->terminate();
|
sdrThread->terminate();
|
||||||
sdrThread->isTerminated(1000);
|
sdrThread->isTerminated(3000);
|
||||||
|
|
||||||
if (t_SDR) {
|
if (t_SDR) {
|
||||||
t_SDR->join();
|
t_SDR->join();
|
||||||
delete t_SDR;
|
delete t_SDR;
|
||||||
t_SDR = nullptr;
|
t_SDR = nullptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
std::cout << "Terminating SDR post-processing thread.." << std::endl;
|
std::cout << "Terminating SDR post-processing thread.." << std::endl;
|
||||||
sdrPostThread->terminate();
|
sdrPostThread->terminate();
|
||||||
|
|
||||||
|
std::cout << "Terminating All Demodulators.." << std::endl;
|
||||||
|
demodMgr.terminateAll();
|
||||||
|
|
||||||
std::cout << "Terminating Visual Processor threads.." << std::endl;
|
std::cout << "Terminating Visual Processor threads.." << std::endl;
|
||||||
spectrumVisualThread->terminate();
|
spectrumVisualThread->terminate();
|
||||||
demodVisualThread->terminate();
|
demodVisualThread->terminate();
|
||||||
@ -542,16 +545,11 @@ void CubicSDR::setSampleRate(long long rate_in) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void CubicSDR::stopDevice(bool store) {
|
void CubicSDR::stopDevice(bool store, int waitMsForTermination) {
|
||||||
if (store) {
|
|
||||||
stoppedDev = sdrThread->getDevice();
|
//Firt we must stop the threads
|
||||||
} else {
|
|
||||||
stoppedDev = nullptr;
|
|
||||||
}
|
|
||||||
sdrThread->setDevice(nullptr);
|
|
||||||
|
|
||||||
sdrThread->terminate();
|
sdrThread->terminate();
|
||||||
sdrThread->isTerminated(1000);
|
sdrThread->isTerminated(waitMsForTermination);
|
||||||
|
|
||||||
if (t_SDR) {
|
if (t_SDR) {
|
||||||
t_SDR->join();
|
t_SDR->join();
|
||||||
@ -559,6 +557,15 @@ void CubicSDR::stopDevice(bool store) {
|
|||||||
t_SDR = nullptr;
|
t_SDR = nullptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//Only now we can nullify devices
|
||||||
|
if (store) {
|
||||||
|
stoppedDev = sdrThread->getDevice();
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
stoppedDev = nullptr;
|
||||||
|
}
|
||||||
|
|
||||||
|
sdrThread->setDevice(nullptr);
|
||||||
}
|
}
|
||||||
|
|
||||||
void CubicSDR::reEnumerateDevices() {
|
void CubicSDR::reEnumerateDevices() {
|
||||||
@ -568,10 +575,10 @@ void CubicSDR::reEnumerateDevices() {
|
|||||||
t_SDREnum = new std::thread(&SDREnumerator::threadMain, sdrEnum);
|
t_SDREnum = new std::thread(&SDREnumerator::threadMain, sdrEnum);
|
||||||
}
|
}
|
||||||
|
|
||||||
void CubicSDR::setDevice(SDRDeviceInfo *dev) {
|
void CubicSDR::setDevice(SDRDeviceInfo *dev, int waitMsForTermination) {
|
||||||
|
|
||||||
sdrThread->terminate();
|
sdrThread->terminate();
|
||||||
sdrThread->isTerminated(1000);
|
sdrThread->isTerminated(waitMsForTermination);
|
||||||
|
|
||||||
if (t_SDR) {
|
if (t_SDR) {
|
||||||
t_SDR->join();
|
t_SDR->join();
|
||||||
|
@ -98,8 +98,8 @@ public:
|
|||||||
long long getSampleRate();
|
long long getSampleRate();
|
||||||
|
|
||||||
std::vector<SDRDeviceInfo *> *getDevices();
|
std::vector<SDRDeviceInfo *> *getDevices();
|
||||||
void setDevice(SDRDeviceInfo *dev);
|
void setDevice(SDRDeviceInfo *dev, int waitMsForTermination);
|
||||||
void stopDevice(bool store);
|
void stopDevice(bool store, int waitMsForTermination);
|
||||||
SDRDeviceInfo * getDevice();
|
SDRDeviceInfo * getDevice();
|
||||||
|
|
||||||
ScopeVisualProcessor *getScopeProcessor();
|
ScopeVisualProcessor *getScopeProcessor();
|
||||||
@ -173,10 +173,10 @@ public:
|
|||||||
private:
|
private:
|
||||||
int FilterEvent(wxEvent& event);
|
int FilterEvent(wxEvent& event);
|
||||||
|
|
||||||
AppFrame *appframe;
|
AppFrame *appframe = nullptr;
|
||||||
AppConfig config;
|
AppConfig config;
|
||||||
PrimaryGLContext *m_glContext;
|
PrimaryGLContext *m_glContext = nullptr;
|
||||||
std::vector<SDRDeviceInfo *> *devs;
|
std::vector<SDRDeviceInfo *> *devs = nullptr;
|
||||||
|
|
||||||
DemodulatorMgr demodMgr;
|
DemodulatorMgr demodMgr;
|
||||||
|
|
||||||
@ -186,27 +186,31 @@ private:
|
|||||||
std::atomic_llong sampleRate;
|
std::atomic_llong sampleRate;
|
||||||
std::atomic_bool agcMode;
|
std::atomic_bool agcMode;
|
||||||
|
|
||||||
SDRThread *sdrThread;
|
SDRThread *sdrThread = nullptr;
|
||||||
SDREnumerator *sdrEnum;
|
SDREnumerator *sdrEnum = nullptr;
|
||||||
SDRPostThread *sdrPostThread;
|
SDRPostThread *sdrPostThread = nullptr;
|
||||||
SpectrumVisualDataThread *spectrumVisualThread;
|
SpectrumVisualDataThread *spectrumVisualThread = nullptr;
|
||||||
SpectrumVisualDataThread *demodVisualThread;
|
SpectrumVisualDataThread *demodVisualThread = nullptr;
|
||||||
|
|
||||||
SDRThreadIQDataQueue* pipeSDRIQData;
|
SDRThreadIQDataQueue* pipeSDRIQData = nullptr;
|
||||||
DemodulatorThreadInputQueue* pipeIQVisualData;
|
DemodulatorThreadInputQueue* pipeIQVisualData = nullptr;
|
||||||
DemodulatorThreadOutputQueue* pipeAudioVisualData;
|
DemodulatorThreadOutputQueue* pipeAudioVisualData = nullptr;
|
||||||
DemodulatorThreadInputQueue* pipeDemodIQVisualData;
|
DemodulatorThreadInputQueue* pipeDemodIQVisualData = nullptr;
|
||||||
DemodulatorThreadInputQueue* pipeWaterfallIQVisualData;
|
DemodulatorThreadInputQueue* pipeWaterfallIQVisualData = nullptr;
|
||||||
DemodulatorThreadInputQueue* pipeActiveDemodIQVisualData;
|
DemodulatorThreadInputQueue* pipeActiveDemodIQVisualData = nullptr;
|
||||||
|
|
||||||
ScopeVisualProcessor scopeProcessor;
|
ScopeVisualProcessor scopeProcessor;
|
||||||
|
|
||||||
SDRDevicesDialog *deviceSelectorDialog;
|
SDRDevicesDialog *deviceSelectorDialog = nullptr;
|
||||||
|
|
||||||
SoapySDR::Kwargs streamArgs;
|
SoapySDR::Kwargs streamArgs;
|
||||||
SoapySDR::Kwargs settingArgs;
|
SoapySDR::Kwargs settingArgs;
|
||||||
|
|
||||||
std::thread *t_SDR, *t_SDREnum, *t_PostSDR, *t_SpectrumVisual, *t_DemodVisual;
|
std::thread *t_SDR = nullptr;
|
||||||
|
std::thread *t_SDREnum = nullptr;
|
||||||
|
std::thread *t_PostSDR = nullptr;
|
||||||
|
std::thread *t_SpectrumVisual = nullptr;
|
||||||
|
std::thread *t_DemodVisual = nullptr;
|
||||||
std::atomic_bool devicesReady;
|
std::atomic_bool devicesReady;
|
||||||
std::atomic_bool devicesFailed;
|
std::atomic_bool devicesFailed;
|
||||||
std::atomic_bool deviceSelectorOpen;
|
std::atomic_bool deviceSelectorOpen;
|
||||||
@ -224,8 +228,8 @@ private:
|
|||||||
std::atomic_bool soloMode;
|
std::atomic_bool soloMode;
|
||||||
SDRDeviceInfo *stoppedDev;
|
SDRDeviceInfo *stoppedDev;
|
||||||
#ifdef USE_HAMLIB
|
#ifdef USE_HAMLIB
|
||||||
RigThread* rigThread;
|
RigThread* rigThread = nullptr;
|
||||||
std::thread *t_Rig;
|
std::thread *t_Rig = nullptr;
|
||||||
#endif
|
#endif
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -6,44 +6,54 @@
|
|||||||
#include "DemodulatorThread.h"
|
#include "DemodulatorThread.h"
|
||||||
#include "DemodulatorInstance.h"
|
#include "DemodulatorInstance.h"
|
||||||
#include <memory.h>
|
#include <memory.h>
|
||||||
|
#include <mutex>
|
||||||
|
|
||||||
|
|
||||||
std::map<int, AudioThread *> AudioThread::deviceController;
|
std::map<int, AudioThread *> AudioThread::deviceController;
|
||||||
std::map<int, int> AudioThread::deviceSampleRate;
|
std::map<int, int> AudioThread::deviceSampleRate;
|
||||||
std::map<int, std::thread *> AudioThread::deviceThread;
|
std::map<int, std::thread *> AudioThread::deviceThread;
|
||||||
|
|
||||||
AudioThread::AudioThread() : IOThread(),
|
AudioThread::AudioThread() : IOThread(),
|
||||||
currentInput(NULL), inputQueue(NULL), nBufferFrames(1024), sampleRate(0) {
|
currentInput(nullptr), inputQueue(nullptr), nBufferFrames(1024), sampleRate(0) {
|
||||||
|
|
||||||
audioQueuePtr.store(0);
|
audioQueuePtr.store(0);
|
||||||
underflowCount.store(0);
|
underflowCount.store(0);
|
||||||
active.store(false);
|
active.store(false);
|
||||||
outputDevice.store(-1);
|
outputDevice.store(-1);
|
||||||
gain.store(1.0);
|
gain = 1.0;
|
||||||
|
|
||||||
vBoundThreads = new std::vector<AudioThread *>;
|
|
||||||
boundThreads.store(vBoundThreads);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
AudioThread::~AudioThread() {
|
AudioThread::~AudioThread() {
|
||||||
boundThreads.store(nullptr);
|
|
||||||
delete vBoundThreads;
|
}
|
||||||
|
|
||||||
|
std::recursive_mutex & AudioThread::getMutex()
|
||||||
|
{
|
||||||
|
return m_mutex;
|
||||||
}
|
}
|
||||||
|
|
||||||
void AudioThread::bindThread(AudioThread *other) {
|
void AudioThread::bindThread(AudioThread *other) {
|
||||||
if (std::find(boundThreads.load()->begin(), boundThreads.load()->end(), other) == boundThreads.load()->end()) {
|
|
||||||
boundThreads.load()->push_back(other);
|
std::lock_guard<std::recursive_mutex> lock(m_mutex);
|
||||||
|
|
||||||
|
if (std::find(boundThreads.begin(), boundThreads.end(), other) == boundThreads.end()) {
|
||||||
|
boundThreads.push_back(other);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void AudioThread::removeThread(AudioThread *other) {
|
void AudioThread::removeThread(AudioThread *other) {
|
||||||
|
|
||||||
|
std::lock_guard<std::recursive_mutex> lock(m_mutex);
|
||||||
|
|
||||||
std::vector<AudioThread *>::iterator i;
|
std::vector<AudioThread *>::iterator i;
|
||||||
i = std::find(boundThreads.load()->begin(), boundThreads.load()->end(), other);
|
i = std::find(boundThreads.begin(), boundThreads.end(), other);
|
||||||
if (i != boundThreads.load()->end()) {
|
if (i != boundThreads.end()) {
|
||||||
boundThreads.load()->erase(i);
|
boundThreads.erase(i);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void AudioThread::deviceCleanup() {
|
void AudioThread::deviceCleanup() {
|
||||||
|
|
||||||
std::map<int, AudioThread *>::iterator i;
|
std::map<int, AudioThread *>::iterator i;
|
||||||
|
|
||||||
for (i = deviceController.begin(); i != deviceController.end(); i++) {
|
for (i = deviceController.begin(); i != deviceController.end(); i++) {
|
||||||
@ -53,53 +63,67 @@ void AudioThread::deviceCleanup() {
|
|||||||
|
|
||||||
static int audioCallback(void *outputBuffer, void * /* inputBuffer */, unsigned int nBufferFrames, double /* streamTime */, RtAudioStreamStatus status,
|
static int audioCallback(void *outputBuffer, void * /* inputBuffer */, unsigned int nBufferFrames, double /* streamTime */, RtAudioStreamStatus status,
|
||||||
void *userData) {
|
void *userData) {
|
||||||
AudioThread *src = (AudioThread *) userData;
|
|
||||||
float *out = (float*) outputBuffer;
|
float *out = (float*)outputBuffer;
|
||||||
|
|
||||||
|
//Zero output buffer in all cases: this allow to mute audio if no AudioThread data is
|
||||||
|
//actually active.
|
||||||
memset(out, 0, nBufferFrames * 2 * sizeof(float));
|
memset(out, 0, nBufferFrames * 2 * sizeof(float));
|
||||||
|
|
||||||
|
AudioThread *src = (AudioThread *) userData;
|
||||||
|
|
||||||
|
std::lock_guard<std::recursive_mutex> lock(src->getMutex());
|
||||||
|
|
||||||
if (src->isTerminated()) {
|
if (src->isTerminated()) {
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (status) {
|
if (status) {
|
||||||
std::cout << "Audio buffer underflow.." << (src->underflowCount++) << std::endl;
|
std::cout << "Audio buffer underflow.." << (src->underflowCount++) << std::endl;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (src->boundThreads.load()->empty()) {
|
if (src->boundThreads.empty()) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
float peak = 0.0;
|
|
||||||
|
double peak = 0.0;
|
||||||
|
|
||||||
|
//for all boundThreads
|
||||||
|
for (size_t j = 0; j < src->boundThreads.size(); j++) {
|
||||||
|
|
||||||
|
AudioThread *srcmix = src->boundThreads[j];
|
||||||
|
|
||||||
|
//lock every single boundThread srcmix in succession the time we process
|
||||||
|
//its audio samples.
|
||||||
|
std::lock_guard<std::recursive_mutex> lock(srcmix->getMutex());
|
||||||
|
|
||||||
for (size_t j = 0; j < src->boundThreads.load()->size(); j++) {
|
|
||||||
AudioThread *srcmix = (*(src->boundThreads.load()))[j];
|
|
||||||
if (srcmix->isTerminated() || !srcmix->inputQueue || srcmix->inputQueue->empty() || !srcmix->isActive()) {
|
if (srcmix->isTerminated() || !srcmix->inputQueue || srcmix->inputQueue->empty() || !srcmix->isActive()) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!srcmix->currentInput) {
|
if (!srcmix->currentInput) {
|
||||||
srcmix->audioQueuePtr = 0;
|
srcmix->audioQueuePtr = 0;
|
||||||
if (srcmix->isTerminated() || srcmix->inputQueue->empty()) {
|
|
||||||
continue;
|
if (!srcmix->inputQueue->try_pop(srcmix->currentInput)) {
|
||||||
}
|
|
||||||
srcmix->inputQueue->pop(srcmix->currentInput);
|
|
||||||
if (srcmix->isTerminated()) {
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (srcmix->currentInput->sampleRate != src->getSampleRate()) {
|
if (srcmix->currentInput->sampleRate != src->getSampleRate()) {
|
||||||
while (srcmix->inputQueue->size()) {
|
|
||||||
srcmix->inputQueue->pop(srcmix->currentInput);
|
while (srcmix->inputQueue->try_pop(srcmix->currentInput)) {
|
||||||
|
|
||||||
if (srcmix->currentInput) {
|
if (srcmix->currentInput) {
|
||||||
if (srcmix->currentInput->sampleRate == src->getSampleRate()) {
|
if (srcmix->currentInput->sampleRate == src->getSampleRate()) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
srcmix->currentInput->decRefCount();
|
srcmix->currentInput->decRefCount();
|
||||||
}
|
}
|
||||||
srcmix->currentInput = NULL;
|
srcmix->currentInput = nullptr;
|
||||||
}
|
} //end while
|
||||||
|
|
||||||
srcmix->audioQueuePtr = 0;
|
srcmix->audioQueuePtr = 0;
|
||||||
|
|
||||||
@ -114,37 +138,35 @@ static int audioCallback(void *outputBuffer, void * /* inputBuffer */, unsigned
|
|||||||
srcmix->audioQueuePtr = 0;
|
srcmix->audioQueuePtr = 0;
|
||||||
if (srcmix->currentInput) {
|
if (srcmix->currentInput) {
|
||||||
srcmix->currentInput->decRefCount();
|
srcmix->currentInput->decRefCount();
|
||||||
srcmix->currentInput = NULL;
|
srcmix->currentInput = nullptr;
|
||||||
}
|
}
|
||||||
if (srcmix->isTerminated() || srcmix->inputQueue->empty()) {
|
|
||||||
|
if (!srcmix->inputQueue->try_pop(srcmix->currentInput)) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
srcmix->inputQueue->pop(srcmix->currentInput);
|
|
||||||
if (srcmix->isTerminated()) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
float mixPeak = srcmix->currentInput->peak * srcmix->gain;
|
double mixPeak = srcmix->currentInput->peak * srcmix->gain;
|
||||||
|
|
||||||
if (srcmix->currentInput->channels == 1) {
|
if (srcmix->currentInput->channels == 1) {
|
||||||
|
|
||||||
for (unsigned int i = 0; i < nBufferFrames; i++) {
|
for (unsigned int i = 0; i < nBufferFrames; i++) {
|
||||||
|
|
||||||
if (srcmix->audioQueuePtr >= srcmix->currentInput->data.size()) {
|
if (srcmix->audioQueuePtr >= srcmix->currentInput->data.size()) {
|
||||||
srcmix->audioQueuePtr = 0;
|
srcmix->audioQueuePtr = 0;
|
||||||
if (srcmix->currentInput) {
|
if (srcmix->currentInput) {
|
||||||
srcmix->currentInput->decRefCount();
|
srcmix->currentInput->decRefCount();
|
||||||
srcmix->currentInput = NULL;
|
srcmix->currentInput = nullptr;
|
||||||
}
|
}
|
||||||
if (srcmix->isTerminated() || srcmix->inputQueue->empty()) {
|
|
||||||
|
if (!srcmix->inputQueue->try_pop(srcmix->currentInput)) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
srcmix->inputQueue->pop(srcmix->currentInput);
|
|
||||||
if (srcmix->isTerminated()) {
|
|
||||||
break;
|
double srcPeak = srcmix->currentInput->peak * srcmix->gain;
|
||||||
}
|
|
||||||
float srcPeak = srcmix->currentInput->peak * srcmix->gain;
|
|
||||||
if (mixPeak < srcPeak) {
|
if (mixPeak < srcPeak) {
|
||||||
mixPeak = srcPeak;
|
mixPeak = srcPeak;
|
||||||
}
|
}
|
||||||
@ -158,25 +180,25 @@ static int audioCallback(void *outputBuffer, void * /* inputBuffer */, unsigned
|
|||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
for (int i = 0, iMax = srcmix->currentInput->channels * nBufferFrames; i < iMax; i++) {
|
for (int i = 0, iMax = srcmix->currentInput->channels * nBufferFrames; i < iMax; i++) {
|
||||||
|
|
||||||
if (srcmix->audioQueuePtr >= srcmix->currentInput->data.size()) {
|
if (srcmix->audioQueuePtr >= srcmix->currentInput->data.size()) {
|
||||||
srcmix->audioQueuePtr = 0;
|
srcmix->audioQueuePtr = 0;
|
||||||
if (srcmix->currentInput) {
|
if (srcmix->currentInput) {
|
||||||
srcmix->currentInput->decRefCount();
|
srcmix->currentInput->decRefCount();
|
||||||
srcmix->currentInput = NULL;
|
srcmix->currentInput = nullptr;
|
||||||
}
|
}
|
||||||
if (srcmix->isTerminated() || srcmix->inputQueue->empty()) {
|
|
||||||
|
if (!srcmix->inputQueue->try_pop(srcmix->currentInput)) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
srcmix->inputQueue->pop(srcmix->currentInput);
|
|
||||||
if (srcmix->isTerminated()) {
|
double srcPeak = srcmix->currentInput->peak * srcmix->gain;
|
||||||
break;
|
|
||||||
}
|
|
||||||
float srcPeak = srcmix->currentInput->peak * srcmix->gain;
|
|
||||||
if (mixPeak < srcPeak) {
|
if (mixPeak < srcPeak) {
|
||||||
mixPeak = srcPeak;
|
mixPeak = srcPeak;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (srcmix->currentInput && srcmix->currentInput->data.size()) {
|
if (srcmix->currentInput && srcmix->currentInput->data.size()) {
|
||||||
|
|
||||||
out[i] = out[i] + srcmix->currentInput->data[srcmix->audioQueuePtr] * srcmix->gain;
|
out[i] = out[i] + srcmix->currentInput->data[srcmix->audioQueuePtr] * srcmix->gain;
|
||||||
}
|
}
|
||||||
srcmix->audioQueuePtr++;
|
srcmix->audioQueuePtr++;
|
||||||
@ -186,11 +208,15 @@ static int audioCallback(void *outputBuffer, void * /* inputBuffer */, unsigned
|
|||||||
peak += mixPeak;
|
peak += mixPeak;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//normalize volume
|
||||||
if (peak > 1.0) {
|
if (peak > 1.0) {
|
||||||
|
float invPeak = (float)(1.0 / peak);
|
||||||
|
|
||||||
for (unsigned int i = 0; i < nBufferFrames * 2; i++) {
|
for (unsigned int i = 0; i < nBufferFrames * 2; i++) {
|
||||||
out[i] /= peak;
|
out[i] *= invPeak;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -247,6 +273,8 @@ void AudioThread::enumerateDevices(std::vector<RtAudio::DeviceInfo> &devs) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
void AudioThread::setDeviceSampleRate(int deviceId, int sampleRate) {
|
void AudioThread::setDeviceSampleRate(int deviceId, int sampleRate) {
|
||||||
|
|
||||||
|
|
||||||
if (deviceController.find(deviceId) != deviceController.end()) {
|
if (deviceController.find(deviceId) != deviceController.end()) {
|
||||||
AudioThreadCommand refreshDevice;
|
AudioThreadCommand refreshDevice;
|
||||||
refreshDevice.cmd = AudioThreadCommand::AUDIO_THREAD_CMD_SET_SAMPLE_RATE;
|
refreshDevice.cmd = AudioThreadCommand::AUDIO_THREAD_CMD_SET_SAMPLE_RATE;
|
||||||
@ -256,14 +284,17 @@ void AudioThread::setDeviceSampleRate(int deviceId, int sampleRate) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
void AudioThread::setSampleRate(int sampleRate) {
|
void AudioThread::setSampleRate(int sampleRate) {
|
||||||
|
|
||||||
|
std::lock_guard<std::recursive_mutex> lock(m_mutex);
|
||||||
|
|
||||||
if (deviceController[outputDevice.load()] == this) {
|
if (deviceController[outputDevice.load()] == this) {
|
||||||
deviceSampleRate[outputDevice.load()] = sampleRate;
|
deviceSampleRate[outputDevice.load()] = sampleRate;
|
||||||
|
|
||||||
dac.stopStream();
|
dac.stopStream();
|
||||||
dac.closeStream();
|
dac.closeStream();
|
||||||
|
|
||||||
for (size_t j = 0; j < boundThreads.load()->size(); j++) {
|
for (size_t j = 0; j < boundThreads.size(); j++) {
|
||||||
AudioThread *srcmix = (*(boundThreads.load()))[j];
|
AudioThread *srcmix = boundThreads[j];
|
||||||
srcmix->setSampleRate(sampleRate);
|
srcmix->setSampleRate(sampleRate);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -286,10 +317,15 @@ void AudioThread::setSampleRate(int sampleRate) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
int AudioThread::getSampleRate() {
|
int AudioThread::getSampleRate() {
|
||||||
|
std::lock_guard<std::recursive_mutex> lock(m_mutex);
|
||||||
|
|
||||||
return this->sampleRate;
|
return this->sampleRate;
|
||||||
}
|
}
|
||||||
|
|
||||||
void AudioThread::setupDevice(int deviceId) {
|
void AudioThread::setupDevice(int deviceId) {
|
||||||
|
|
||||||
|
std::lock_guard<std::recursive_mutex> lock(m_mutex);
|
||||||
|
|
||||||
parameters.deviceId = deviceId;
|
parameters.deviceId = deviceId;
|
||||||
parameters.nChannels = 2;
|
parameters.nChannels = 2;
|
||||||
parameters.firstChannel = 0;
|
parameters.firstChannel = 0;
|
||||||
@ -323,6 +359,7 @@ void AudioThread::setupDevice(int deviceId) {
|
|||||||
|
|
||||||
deviceThread[parameters.deviceId] = new std::thread(&AudioThread::threadMain, deviceController[parameters.deviceId]);
|
deviceThread[parameters.deviceId] = new std::thread(&AudioThread::threadMain, deviceController[parameters.deviceId]);
|
||||||
} else if (deviceController[parameters.deviceId] == this) {
|
} else if (deviceController[parameters.deviceId] == this) {
|
||||||
|
//Attach callback
|
||||||
dac.openStream(¶meters, NULL, RTAUDIO_FLOAT32, sampleRate, &nBufferFrames, &audioCallback, (void *) this, &opts);
|
dac.openStream(¶meters, NULL, RTAUDIO_FLOAT32, sampleRate, &nBufferFrames, &audioCallback, (void *) this, &opts);
|
||||||
dac.startStream();
|
dac.startStream();
|
||||||
} else {
|
} else {
|
||||||
@ -340,6 +377,8 @@ void AudioThread::setupDevice(int deviceId) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
int AudioThread::getOutputDevice() {
|
int AudioThread::getOutputDevice() {
|
||||||
|
std::lock_guard<std::recursive_mutex> lock(m_mutex);
|
||||||
|
|
||||||
if (outputDevice == -1) {
|
if (outputDevice == -1) {
|
||||||
return dac.getDefaultOutputDevice();
|
return dac.getDefaultOutputDevice();
|
||||||
}
|
}
|
||||||
@ -347,6 +386,9 @@ int AudioThread::getOutputDevice() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
void AudioThread::setInitOutputDevice(int deviceId, int sampleRate) {
|
void AudioThread::setInitOutputDevice(int deviceId, int sampleRate) {
|
||||||
|
|
||||||
|
std::lock_guard<std::recursive_mutex> lock(m_mutex);
|
||||||
|
|
||||||
outputDevice = deviceId;
|
outputDevice = deviceId;
|
||||||
if (sampleRate == -1) {
|
if (sampleRate == -1) {
|
||||||
if (deviceSampleRate.find(parameters.deviceId) != deviceSampleRate.end()) {
|
if (deviceSampleRate.find(parameters.deviceId) != deviceSampleRate.end()) {
|
||||||
@ -379,8 +421,10 @@ void AudioThread::run() {
|
|||||||
|
|
||||||
inputQueue = static_cast<AudioThreadInputQueue *>(getInputQueue("AudioDataInput"));
|
inputQueue = static_cast<AudioThreadInputQueue *>(getInputQueue("AudioDataInput"));
|
||||||
|
|
||||||
|
//Infinite loop, witing for commands or for termination
|
||||||
while (!stopping) {
|
while (!stopping) {
|
||||||
AudioThreadCommand command;
|
AudioThreadCommand command;
|
||||||
|
|
||||||
cmdQueue.pop(command);
|
cmdQueue.pop(command);
|
||||||
|
|
||||||
if (command.cmd == AudioThreadCommand::AUDIO_THREAD_CMD_SET_DEVICE) {
|
if (command.cmd == AudioThreadCommand::AUDIO_THREAD_CMD_SET_DEVICE) {
|
||||||
@ -391,20 +435,26 @@ void AudioThread::run() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Drain any remaining inputs
|
//Thread termination, prevent fancy things to happen, lock the whole thing:
|
||||||
if (inputQueue) while (!inputQueue->empty()) {
|
//This way audioThreadCallback is rightly protected from thread termination
|
||||||
AudioThreadInput *ref;
|
std::lock_guard<std::recursive_mutex> lock(m_mutex);
|
||||||
inputQueue->pop(ref);
|
|
||||||
|
// Drain any remaining inputs, with a non-blocking pop
|
||||||
|
AudioThreadInput *ref;
|
||||||
|
while (inputQueue && inputQueue->try_pop(ref)) {
|
||||||
|
|
||||||
if (ref) {
|
if (ref) {
|
||||||
ref->decRefCount();
|
ref->decRefCount();
|
||||||
}
|
}
|
||||||
}
|
} //end while
|
||||||
|
|
||||||
|
//Nullify currentInput...
|
||||||
if (currentInput) {
|
if (currentInput) {
|
||||||
currentInput->setRefCount(0);
|
currentInput->setRefCount(0);
|
||||||
currentInput = nullptr;
|
currentInput = nullptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//Stop
|
||||||
if (deviceController[parameters.deviceId] != this) {
|
if (deviceController[parameters.deviceId] != this) {
|
||||||
deviceController[parameters.deviceId]->removeThread(this);
|
deviceController[parameters.deviceId]->removeThread(this);
|
||||||
} else {
|
} else {
|
||||||
@ -430,10 +480,14 @@ void AudioThread::terminate() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
bool AudioThread::isActive() {
|
bool AudioThread::isActive() {
|
||||||
|
std::lock_guard<std::recursive_mutex> lock(m_mutex);
|
||||||
|
|
||||||
return active;
|
return active;
|
||||||
}
|
}
|
||||||
|
|
||||||
void AudioThread::setActive(bool state) {
|
void AudioThread::setActive(bool state) {
|
||||||
|
|
||||||
|
std::lock_guard<std::recursive_mutex> lock(m_mutex);
|
||||||
|
|
||||||
AudioThreadInput *dummy;
|
AudioThreadInput *dummy;
|
||||||
if (state && !active && inputQueue) {
|
if (state && !active && inputQueue) {
|
||||||
@ -444,8 +498,9 @@ void AudioThread::setActive(bool state) {
|
|||||||
|
|
||||||
// Activity state changing, clear any inputs
|
// Activity state changing, clear any inputs
|
||||||
if(inputQueue) {
|
if(inputQueue) {
|
||||||
while (!inputQueue->empty()) { // flush queue
|
|
||||||
inputQueue->pop(dummy);
|
while (inputQueue->try_pop(dummy)) { // flush queue, non-blocking pop
|
||||||
|
|
||||||
if (dummy) {
|
if (dummy) {
|
||||||
dummy->decRefCount();
|
dummy->decRefCount();
|
||||||
}
|
}
|
||||||
@ -459,6 +514,9 @@ AudioThreadCommandQueue *AudioThread::getCommandQueue() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
void AudioThread::setGain(float gain_in) {
|
void AudioThread::setGain(float gain_in) {
|
||||||
|
|
||||||
|
std::lock_guard<std::recursive_mutex> lock(m_mutex);
|
||||||
|
|
||||||
if (gain < 0.0) {
|
if (gain < 0.0) {
|
||||||
gain = 0.0;
|
gain = 0.0;
|
||||||
}
|
}
|
||||||
@ -469,5 +527,8 @@ void AudioThread::setGain(float gain_in) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
float AudioThread::getGain() {
|
float AudioThread::getGain() {
|
||||||
|
|
||||||
|
std::lock_guard<std::recursive_mutex> lock(m_mutex);
|
||||||
|
|
||||||
return gain;
|
return gain;
|
||||||
}
|
}
|
||||||
|
@ -57,7 +57,7 @@ public:
|
|||||||
std::atomic_bool initialized;
|
std::atomic_bool initialized;
|
||||||
std::atomic_bool active;
|
std::atomic_bool active;
|
||||||
std::atomic_int outputDevice;
|
std::atomic_int outputDevice;
|
||||||
std::atomic<float> gain;
|
float gain;
|
||||||
|
|
||||||
AudioThread();
|
AudioThread();
|
||||||
~AudioThread();
|
~AudioThread();
|
||||||
@ -88,7 +88,13 @@ private:
|
|||||||
AudioThreadCommandQueue cmdQueue;
|
AudioThreadCommandQueue cmdQueue;
|
||||||
int sampleRate;
|
int sampleRate;
|
||||||
|
|
||||||
|
//The own m_mutex protecting this AudioThread, in particular boundThreads
|
||||||
|
std::recursive_mutex m_mutex;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
|
//give access to the this AudioThread lock
|
||||||
|
std::recursive_mutex& getMutex();
|
||||||
|
|
||||||
void bindThread(AudioThread *other);
|
void bindThread(AudioThread *other);
|
||||||
void removeThread(AudioThread *other);
|
void removeThread(AudioThread *other);
|
||||||
|
|
||||||
@ -97,7 +103,8 @@ public:
|
|||||||
static std::map<int,std::thread *> deviceThread;
|
static std::map<int,std::thread *> deviceThread;
|
||||||
static void deviceCleanup();
|
static void deviceCleanup();
|
||||||
static void setDeviceSampleRate(int deviceId, int sampleRate);
|
static void setDeviceSampleRate(int deviceId, int sampleRate);
|
||||||
std::atomic<std::vector<AudioThread *> *> boundThreads;
|
|
||||||
std::vector<AudioThread *> *vBoundThreads;
|
//protected by m_mutex
|
||||||
|
std::vector<AudioThread *> boundThreads;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -152,7 +152,6 @@ bool DemodulatorInstance::isTerminated() {
|
|||||||
bool demodTerminated = demodulatorThread->isTerminated();
|
bool demodTerminated = demodulatorThread->isTerminated();
|
||||||
bool preDemodTerminated = demodulatorPreThread->isTerminated();
|
bool preDemodTerminated = demodulatorPreThread->isTerminated();
|
||||||
|
|
||||||
|
|
||||||
//Cleanup the worker threads, if the threads are indeed terminated
|
//Cleanup the worker threads, if the threads are indeed terminated
|
||||||
if (audioTerminated) {
|
if (audioTerminated) {
|
||||||
|
|
||||||
@ -168,7 +167,6 @@ bool DemodulatorInstance::isTerminated() {
|
|||||||
if (demodTerminated) {
|
if (demodTerminated) {
|
||||||
|
|
||||||
if (t_Demod) {
|
if (t_Demod) {
|
||||||
|
|
||||||
#ifdef __APPLE__
|
#ifdef __APPLE__
|
||||||
pthread_join(t_Demod, nullptr);
|
pthread_join(t_Demod, nullptr);
|
||||||
#else
|
#else
|
||||||
@ -185,8 +183,8 @@ bool DemodulatorInstance::isTerminated() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (preDemodTerminated) {
|
if (preDemodTerminated) {
|
||||||
|
|
||||||
if (t_PreDemod) {
|
if (t_PreDemod) {
|
||||||
|
|
||||||
#ifdef __APPLE__
|
#ifdef __APPLE__
|
||||||
pthread_join(t_PreDemod, NULL);
|
pthread_join(t_PreDemod, NULL);
|
||||||
@ -195,10 +193,9 @@ bool DemodulatorInstance::isTerminated() {
|
|||||||
delete t_PreDemod;
|
delete t_PreDemod;
|
||||||
#endif
|
#endif
|
||||||
t_PreDemod = nullptr;
|
t_PreDemod = nullptr;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
bool terminated = audioTerminated && demodTerminated && preDemodTerminated;
|
bool terminated = audioTerminated && demodTerminated && preDemodTerminated;
|
||||||
|
|
||||||
return terminated;
|
return terminated;
|
||||||
|
@ -136,13 +136,13 @@ void DemodulatorMgr::deleteThread(DemodulatorInstance *demod) {
|
|||||||
i = std::find(demods.begin(), demods.end(), demod);
|
i = std::find(demods.begin(), demods.end(), demod);
|
||||||
|
|
||||||
if (activeDemodulator == demod) {
|
if (activeDemodulator == demod) {
|
||||||
activeDemodulator = NULL;
|
activeDemodulator = nullptr;
|
||||||
}
|
}
|
||||||
if (lastActiveDemodulator == demod) {
|
if (lastActiveDemodulator == demod) {
|
||||||
lastActiveDemodulator = NULL;
|
lastActiveDemodulator = nullptr;
|
||||||
}
|
}
|
||||||
if (activeVisualDemodulator == demod) {
|
if (activeVisualDemodulator == demod) {
|
||||||
activeVisualDemodulator = NULL;
|
activeVisualDemodulator = nullptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (i != demods.end()) {
|
if (i != demods.end()) {
|
||||||
@ -150,6 +150,7 @@ void DemodulatorMgr::deleteThread(DemodulatorInstance *demod) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
//Ask for termination
|
//Ask for termination
|
||||||
|
demod->setActive(false);
|
||||||
demod->terminate();
|
demod->terminate();
|
||||||
|
|
||||||
//Do not cleanup immediatly
|
//Do not cleanup immediatly
|
||||||
@ -200,27 +201,28 @@ bool DemodulatorMgr::anyDemodulatorsAt(long long freq, int bandwidth) {
|
|||||||
|
|
||||||
|
|
||||||
void DemodulatorMgr::setActiveDemodulator(DemodulatorInstance *demod, bool temporary) {
|
void DemodulatorMgr::setActiveDemodulator(DemodulatorInstance *demod, bool temporary) {
|
||||||
std::lock_guard < std::recursive_mutex > lock(demods_busy);
|
|
||||||
if (!temporary) {
|
if (!temporary) {
|
||||||
if (activeDemodulator != NULL) {
|
if (activeDemodulator.load() != nullptr) {
|
||||||
lastActiveDemodulator = activeDemodulator;
|
lastActiveDemodulator = activeDemodulator.load();
|
||||||
updateLastState();
|
updateLastState();
|
||||||
} else {
|
} else {
|
||||||
lastActiveDemodulator = demod;
|
lastActiveDemodulator = demod;
|
||||||
}
|
}
|
||||||
updateLastState();
|
updateLastState();
|
||||||
#if USE_HAMLIB
|
#if USE_HAMLIB
|
||||||
if (wxGetApp().rigIsActive() && wxGetApp().getRigThread()->getFollowModem() && lastActiveDemodulator) {
|
if (wxGetApp().rigIsActive() && wxGetApp().getRigThread()->getFollowModem() && lastActiveDemodulator.load()) {
|
||||||
wxGetApp().getRigThread()->setFrequency(lastActiveDemodulator->getFrequency(),true);
|
wxGetApp().getRigThread()->setFrequency(lastActiveDemodulator.load()->getFrequency(),true);
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
} else {
|
} else {
|
||||||
|
std::lock_guard < std::recursive_mutex > lock(demods_busy);
|
||||||
garbageCollect();
|
garbageCollect();
|
||||||
ReBufferGC::garbageCollect();
|
ReBufferGC::garbageCollect();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (activeVisualDemodulator) {
|
if (activeVisualDemodulator.load()) {
|
||||||
activeVisualDemodulator->setVisualOutputQueue(NULL);
|
activeVisualDemodulator.load()->setVisualOutputQueue(nullptr);
|
||||||
}
|
}
|
||||||
if (demod) {
|
if (demod) {
|
||||||
demod->setVisualOutputQueue(wxGetApp().getAudioVisualQueue());
|
demod->setVisualOutputQueue(wxGetApp().getAudioVisualQueue());
|
||||||
@ -238,7 +240,7 @@ void DemodulatorMgr::setActiveDemodulator(DemodulatorInstance *demod, bool tempo
|
|||||||
}
|
}
|
||||||
|
|
||||||
DemodulatorInstance *DemodulatorMgr::getActiveDemodulator() {
|
DemodulatorInstance *DemodulatorMgr::getActiveDemodulator() {
|
||||||
if (activeDemodulator && !activeDemodulator->isActive()) {
|
if (activeDemodulator.load() && !activeDemodulator.load()->isActive()) {
|
||||||
activeDemodulator = getLastActiveDemodulator();
|
activeDemodulator = getLastActiveDemodulator();
|
||||||
}
|
}
|
||||||
return activeDemodulator;
|
return activeDemodulator;
|
||||||
@ -262,8 +264,6 @@ void DemodulatorMgr::garbageCollect() {
|
|||||||
std::cout << "Garbage collected demodulator instance " << deleted->getLabel() << std::endl;
|
std::cout << "Garbage collected demodulator instance " << deleted->getLabel() << std::endl;
|
||||||
|
|
||||||
delete deleted;
|
delete deleted;
|
||||||
|
|
||||||
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -273,27 +273,28 @@ void DemodulatorMgr::garbageCollect() {
|
|||||||
|
|
||||||
void DemodulatorMgr::updateLastState() {
|
void DemodulatorMgr::updateLastState() {
|
||||||
std::lock_guard < std::recursive_mutex > lock(demods_busy);
|
std::lock_guard < std::recursive_mutex > lock(demods_busy);
|
||||||
|
|
||||||
if (std::find(demods.begin(), demods.end(), lastActiveDemodulator) == demods.end()) {
|
if (std::find(demods.begin(), demods.end(), lastActiveDemodulator) == demods.end()) {
|
||||||
if (activeDemodulator && activeDemodulator->isActive()) {
|
if (activeDemodulator.load() && activeDemodulator.load()->isActive()) {
|
||||||
lastActiveDemodulator = activeDemodulator;
|
lastActiveDemodulator = activeDemodulator.load();
|
||||||
} else if (activeDemodulator && !activeDemodulator->isActive()){
|
} else if (activeDemodulator.load() && !activeDemodulator.load()->isActive()){
|
||||||
activeDemodulator = NULL;
|
activeDemodulator = nullptr;
|
||||||
lastActiveDemodulator = NULL;
|
lastActiveDemodulator = nullptr;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (lastActiveDemodulator && !lastActiveDemodulator->isActive()) {
|
if (lastActiveDemodulator.load() && !lastActiveDemodulator.load()->isActive()) {
|
||||||
lastActiveDemodulator = NULL;
|
lastActiveDemodulator = nullptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (lastActiveDemodulator) {
|
if (lastActiveDemodulator.load()) {
|
||||||
lastBandwidth = lastActiveDemodulator->getBandwidth();
|
lastBandwidth = lastActiveDemodulator.load()->getBandwidth();
|
||||||
lastDemodType = lastActiveDemodulator->getDemodulatorType();
|
lastDemodType = lastActiveDemodulator.load()->getDemodulatorType();
|
||||||
lastDemodLock = lastActiveDemodulator->getDemodulatorLock()?true:false;
|
lastDemodLock = lastActiveDemodulator.load()->getDemodulatorLock()?true:false;
|
||||||
lastSquelchEnabled = lastActiveDemodulator->isSquelchEnabled();
|
lastSquelchEnabled = lastActiveDemodulator.load()->isSquelchEnabled();
|
||||||
lastSquelch = lastActiveDemodulator->getSquelchLevel();
|
lastSquelch = lastActiveDemodulator.load()->getSquelchLevel();
|
||||||
lastGain = lastActiveDemodulator->getGain();
|
lastGain = lastActiveDemodulator.load()->getGain();
|
||||||
lastModemSettings[lastDemodType] = lastActiveDemodulator->readModemSettings();
|
lastModemSettings[lastDemodType] = lastActiveDemodulator.load()->readModemSettings();
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -53,15 +53,17 @@ public:
|
|||||||
void setLastModemSettings(std::string, ModemSettings);
|
void setLastModemSettings(std::string, ModemSettings);
|
||||||
|
|
||||||
void updateLastState();
|
void updateLastState();
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
|
||||||
void garbageCollect();
|
void garbageCollect();
|
||||||
|
|
||||||
std::vector<DemodulatorInstance *> demods;
|
std::vector<DemodulatorInstance *> demods;
|
||||||
std::vector<DemodulatorInstance *> demods_deleted;
|
std::vector<DemodulatorInstance *> demods_deleted;
|
||||||
DemodulatorInstance *activeDemodulator;
|
|
||||||
DemodulatorInstance *lastActiveDemodulator;
|
std::atomic<DemodulatorInstance *> activeDemodulator;
|
||||||
DemodulatorInstance *activeVisualDemodulator;
|
std::atomic<DemodulatorInstance *> lastActiveDemodulator;
|
||||||
|
std::atomic<DemodulatorInstance *> activeVisualDemodulator;
|
||||||
|
|
||||||
int lastBandwidth;
|
int lastBandwidth;
|
||||||
std::string lastDemodType;
|
std::string lastDemodType;
|
||||||
|
@ -66,6 +66,7 @@ void DemodulatorPreThread::run() {
|
|||||||
|
|
||||||
while (!stopping) {
|
while (!stopping) {
|
||||||
DemodulatorThreadIQData *inp;
|
DemodulatorThreadIQData *inp;
|
||||||
|
|
||||||
iqInputQueue->pop(inp);
|
iqInputQueue->pop(inp);
|
||||||
|
|
||||||
if (frequencyChanged.load()) {
|
if (frequencyChanged.load()) {
|
||||||
@ -205,17 +206,20 @@ void DemodulatorPreThread::run() {
|
|||||||
resamp->modemKit = cModemKit;
|
resamp->modemKit = cModemKit;
|
||||||
resamp->sampleRate = currentBandwidth;
|
resamp->sampleRate = currentBandwidth;
|
||||||
|
|
||||||
iqOutputQueue->push(resamp);
|
if (!iqOutputQueue->push(resamp)) {
|
||||||
|
resamp->setRefCount(0);
|
||||||
|
std::cout << "DemodulatorPreThread::run() cannot push resamp into iqOutputQueue, is full !" << std::endl;
|
||||||
|
std::this_thread::yield();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
inp->decRefCount();
|
inp->decRefCount();
|
||||||
|
|
||||||
if (!stopping && !workerResults->empty()) {
|
DemodulatorWorkerThreadResult result;
|
||||||
while (!workerResults->empty()) {
|
//process all worker results until
|
||||||
DemodulatorWorkerThreadResult result;
|
while (!stopping && workerResults->try_pop(result)) {
|
||||||
workerResults->pop(result);
|
|
||||||
|
switch (result.cmd) {
|
||||||
switch (result.cmd) {
|
|
||||||
case DemodulatorWorkerThreadResult::DEMOD_WORKER_THREAD_RESULT_FILTERS:
|
case DemodulatorWorkerThreadResult::DEMOD_WORKER_THREAD_RESULT_FILTERS:
|
||||||
if (result.iqResampler) {
|
if (result.iqResampler) {
|
||||||
if (iqResampler) {
|
if (iqResampler) {
|
||||||
@ -258,20 +262,19 @@ void DemodulatorPreThread::run() {
|
|||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
} //end while
|
||||||
|
|
||||||
if ((cModem != nullptr) && modemSettingsChanged.load()) {
|
if ((cModem != nullptr) && modemSettingsChanged.load()) {
|
||||||
cModem->writeSettings(modemSettingsBuffered);
|
cModem->writeSettings(modemSettingsBuffered);
|
||||||
modemSettingsBuffered.clear();
|
modemSettingsBuffered.clear();
|
||||||
modemSettingsChanged.store(false);
|
modemSettingsChanged.store(false);
|
||||||
}
|
}
|
||||||
}
|
} //end while stopping
|
||||||
|
|
||||||
while (!iqOutputQueue->empty()) {
|
DemodulatorThreadPostIQData *tmp;
|
||||||
DemodulatorThreadPostIQData *tmp;
|
while (iqOutputQueue->try_pop(tmp)) {
|
||||||
iqOutputQueue->pop(tmp);
|
|
||||||
tmp->decRefCount();
|
tmp->decRefCount();
|
||||||
}
|
}
|
||||||
buffers.purge();
|
buffers.purge();
|
||||||
@ -337,7 +340,10 @@ int DemodulatorPreThread::getAudioSampleRate() {
|
|||||||
void DemodulatorPreThread::terminate() {
|
void DemodulatorPreThread::terminate() {
|
||||||
IOThread::terminate();
|
IOThread::terminate();
|
||||||
DemodulatorThreadIQData *inp = new DemodulatorThreadIQData; // push dummy to nudge queue
|
DemodulatorThreadIQData *inp = new DemodulatorThreadIQData; // push dummy to nudge queue
|
||||||
iqInputQueue->push(inp);
|
if (!iqInputQueue->push(inp)) {
|
||||||
|
delete inp;
|
||||||
|
}
|
||||||
|
|
||||||
DemodulatorWorkerThreadCommand command(DemodulatorWorkerThreadCommand::DEMOD_WORKER_THREAD_CMD_NULL);
|
DemodulatorWorkerThreadCommand command(DemodulatorWorkerThreadCommand::DEMOD_WORKER_THREAD_CMD_NULL);
|
||||||
workerQueue->push(command);
|
workerQueue->push(command);
|
||||||
|
|
||||||
|
@ -74,6 +74,7 @@ void DemodulatorThread::run() {
|
|||||||
|
|
||||||
while (!stopping) {
|
while (!stopping) {
|
||||||
DemodulatorThreadPostIQData *inp;
|
DemodulatorThreadPostIQData *inp;
|
||||||
|
|
||||||
iqInputQueue->pop(inp);
|
iqInputQueue->pop(inp);
|
||||||
// std::lock_guard < std::mutex > lock(inp->m_mutex);
|
// std::lock_guard < std::mutex > lock(inp->m_mutex);
|
||||||
|
|
||||||
@ -238,55 +239,67 @@ void DemodulatorThread::run() {
|
|||||||
ati_vis->type = 0;
|
ati_vis->type = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
localAudioVisOutputQueue->push(ati_vis);
|
if (!localAudioVisOutputQueue->push(ati_vis)) {
|
||||||
|
ati_vis->setRefCount(0);
|
||||||
|
std::cout << "DemodulatorThread::run() cannot push ati_vis into localAudioVisOutputQueue, is full !" << std::endl;
|
||||||
|
std::this_thread::yield();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
if (ati != nullptr) {
|
if (ati != nullptr) {
|
||||||
if (!muted.load() && (!wxGetApp().getSoloMode() || (demodInstance == wxGetApp().getDemodMgr().getLastActiveDemodulator()))) {
|
if (!muted.load() && (!wxGetApp().getSoloMode() || (demodInstance == wxGetApp().getDemodMgr().getLastActiveDemodulator()))) {
|
||||||
audioOutputQueue->push(ati);
|
|
||||||
|
if (!audioOutputQueue->push(ati)) {
|
||||||
|
ati->decRefCount();
|
||||||
|
std::cout << "DemodulatorThread::run() cannot push ati into audioOutputQueue, is full !" << std::endl;
|
||||||
|
std::this_thread::yield();
|
||||||
|
}
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
ati->setRefCount(0);
|
ati->setRefCount(0);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!threadQueueControl->empty()) {
|
DemodulatorThreadControlCommand command;
|
||||||
while (!threadQueueControl->empty()) {
|
|
||||||
DemodulatorThreadControlCommand command;
|
//empty command queue, execute commands
|
||||||
threadQueueControl->pop(command);
|
while (threadQueueControl->try_pop(command)) {
|
||||||
|
|
||||||
switch (command.cmd) {
|
switch (command.cmd) {
|
||||||
case DemodulatorThreadControlCommand::DEMOD_THREAD_CMD_CTL_SQUELCH_ON:
|
case DemodulatorThreadControlCommand::DEMOD_THREAD_CMD_CTL_SQUELCH_ON:
|
||||||
squelchEnabled = true;
|
squelchEnabled = true;
|
||||||
break;
|
break;
|
||||||
case DemodulatorThreadControlCommand::DEMOD_THREAD_CMD_CTL_SQUELCH_OFF:
|
case DemodulatorThreadControlCommand::DEMOD_THREAD_CMD_CTL_SQUELCH_OFF:
|
||||||
squelchEnabled = false;
|
squelchEnabled = false;
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
inp->decRefCount();
|
inp->decRefCount();
|
||||||
}
|
}
|
||||||
// end while !stopping
|
// end while !stopping
|
||||||
|
|
||||||
// Purge any unused inputs
|
// Purge any unused inputs, with a non-blocking pop
|
||||||
while (!iqInputQueue->empty()) {
|
DemodulatorThreadPostIQData *ref;
|
||||||
DemodulatorThreadPostIQData *ref;
|
while (iqInputQueue->try_pop(ref)) {
|
||||||
iqInputQueue->pop(ref);
|
|
||||||
if (ref) { // May have other consumers; just decrement
|
if (ref) { // May have other consumers; just decrement
|
||||||
ref->decRefCount();
|
ref->decRefCount();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
while (!audioOutputQueue->empty()) {
|
|
||||||
AudioThreadInput *ref;
|
AudioThreadInput *ref_audio;
|
||||||
audioOutputQueue->pop(ref);
|
while (audioOutputQueue->try_pop(ref_audio)) {
|
||||||
if (ref) { // Originated here; set RefCount to 0
|
|
||||||
ref->setRefCount(0);
|
if (ref_audio) { // Originated here; set RefCount to 0
|
||||||
|
ref_audio->setRefCount(0);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
outputBuffers.purge();
|
outputBuffers.purge();
|
||||||
|
|
||||||
// std::cout << "Demodulator thread done." << std::endl;
|
// std::cout << "Demodulator thread done." << std::endl;
|
||||||
@ -295,7 +308,9 @@ void DemodulatorThread::run() {
|
|||||||
void DemodulatorThread::terminate() {
|
void DemodulatorThread::terminate() {
|
||||||
IOThread::terminate();
|
IOThread::terminate();
|
||||||
DemodulatorThreadPostIQData *inp = new DemodulatorThreadPostIQData; // push dummy to nudge queue
|
DemodulatorThreadPostIQData *inp = new DemodulatorThreadPostIQData; // push dummy to nudge queue
|
||||||
iqInputQueue->push(inp);
|
if (!iqInputQueue->push(inp)) {
|
||||||
|
delete inp;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
bool DemodulatorThread::isMuted() {
|
bool DemodulatorThread::isMuted() {
|
||||||
|
@ -24,8 +24,12 @@ void DemodulatorWorkerThread::run() {
|
|||||||
DemodulatorWorkerThreadCommand command;
|
DemodulatorWorkerThreadCommand command;
|
||||||
|
|
||||||
bool done = false;
|
bool done = false;
|
||||||
|
//Beware of the subtility here,
|
||||||
|
//we are waiting for the first command to show up (blocking!)
|
||||||
|
//then consuming the commands until done.
|
||||||
while (!done) {
|
while (!done) {
|
||||||
commandQueue->pop(command);
|
commandQueue->pop(command);
|
||||||
|
|
||||||
switch (command.cmd) {
|
switch (command.cmd) {
|
||||||
case DemodulatorWorkerThreadCommand::DEMOD_WORKER_THREAD_CMD_BUILD_FILTERS:
|
case DemodulatorWorkerThreadCommand::DEMOD_WORKER_THREAD_CMD_BUILD_FILTERS:
|
||||||
filterChanged = true;
|
filterChanged = true;
|
||||||
|
@ -314,7 +314,7 @@ void SDRDevicesDialog::OnUseSelected( wxMouseEvent& event) {
|
|||||||
devConfig->setStreamOpts(streamArgs);
|
devConfig->setStreamOpts(streamArgs);
|
||||||
wxGetApp().setDeviceArgs(settingArgs);
|
wxGetApp().setDeviceArgs(settingArgs);
|
||||||
wxGetApp().setStreamArgs(streamArgs);
|
wxGetApp().setStreamArgs(streamArgs);
|
||||||
wxGetApp().setDevice(dev);
|
wxGetApp().setDevice(dev,0);
|
||||||
|
|
||||||
Close();
|
Close();
|
||||||
}
|
}
|
||||||
@ -483,7 +483,7 @@ void SDRDevicesDialog::doRefreshDevices() {
|
|||||||
editId = nullptr;
|
editId = nullptr;
|
||||||
removeId = nullptr;
|
removeId = nullptr;
|
||||||
dev = nullptr;
|
dev = nullptr;
|
||||||
wxGetApp().stopDevice(false);
|
wxGetApp().stopDevice(false, 2000);
|
||||||
devTree->DeleteAllItems();
|
devTree->DeleteAllItems();
|
||||||
devTree->Disable();
|
devTree->Disable();
|
||||||
m_propertyGrid->Clear();
|
m_propertyGrid->Clear();
|
||||||
|
@ -17,6 +17,7 @@ unsigned int FFTDataDistributor::getLinesPerSecond() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
void FFTDataDistributor::process() {
|
void FFTDataDistributor::process() {
|
||||||
|
|
||||||
while (!input->empty()) {
|
while (!input->empty()) {
|
||||||
if (!isAnyOutputEmpty()) {
|
if (!isAnyOutputEmpty()) {
|
||||||
return;
|
return;
|
||||||
|
@ -68,10 +68,10 @@ void ScopeVisualProcessor::process() {
|
|||||||
if (!isOutputEmpty()) {
|
if (!isOutputEmpty()) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (!input->empty()) {
|
AudioThreadInput *audioInputData;
|
||||||
AudioThreadInput *audioInputData;
|
|
||||||
input->pop(audioInputData);
|
if (input->try_pop(audioInputData)) {
|
||||||
|
|
||||||
if (!audioInputData) {
|
if (!audioInputData) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@ -271,5 +271,5 @@ void ScopeVisualProcessor::process() {
|
|||||||
} else {
|
} else {
|
||||||
delete audioInputData; //->decRefCount();
|
delete audioInputData; //->decRefCount();
|
||||||
}
|
}
|
||||||
}
|
} //end if try_pop()
|
||||||
}
|
}
|
||||||
|
@ -86,11 +86,10 @@ protected:
|
|||||||
|
|
||||||
output->setRefCount(outputs.size());
|
output->setRefCount(outputs.size());
|
||||||
for (outputs_i = outputs.begin(); outputs_i != outputs.end(); outputs_i++) {
|
for (outputs_i = outputs.begin(); outputs_i != outputs.end(); outputs_i++) {
|
||||||
if ((*outputs_i)->full()) {
|
|
||||||
|
if (!(*outputs_i)->push(output)) {
|
||||||
output->decRefCount();
|
output->decRefCount();
|
||||||
} else {
|
}
|
||||||
(*outputs_i)->push(output);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -107,12 +106,16 @@ template<class OutputDataType = ReferenceCounter>
|
|||||||
class VisualDataDistributor : public VisualProcessor<OutputDataType, OutputDataType> {
|
class VisualDataDistributor : public VisualProcessor<OutputDataType, OutputDataType> {
|
||||||
protected:
|
protected:
|
||||||
void process() {
|
void process() {
|
||||||
while (!VisualProcessor<OutputDataType, OutputDataType>::input->empty()) {
|
OutputDataType *inp;
|
||||||
|
while (VisualProcessor<OutputDataType, OutputDataType>::input->try_pop(inp)) {
|
||||||
|
|
||||||
if (!VisualProcessor<OutputDataType, OutputDataType>::isAnyOutputEmpty()) {
|
if (!VisualProcessor<OutputDataType, OutputDataType>::isAnyOutputEmpty()) {
|
||||||
|
if (inp) {
|
||||||
|
inp->decRefCount();
|
||||||
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
OutputDataType *inp;
|
|
||||||
VisualProcessor<OutputDataType, OutputDataType>::input->pop(inp);
|
|
||||||
if (inp) {
|
if (inp) {
|
||||||
VisualProcessor<OutputDataType, OutputDataType>::distribute(inp);
|
VisualProcessor<OutputDataType, OutputDataType>::distribute(inp);
|
||||||
}
|
}
|
||||||
@ -125,12 +128,15 @@ template<class OutputDataType = ReferenceCounter>
|
|||||||
class VisualDataReDistributor : public VisualProcessor<OutputDataType, OutputDataType> {
|
class VisualDataReDistributor : public VisualProcessor<OutputDataType, OutputDataType> {
|
||||||
protected:
|
protected:
|
||||||
void process() {
|
void process() {
|
||||||
while (!VisualProcessor<OutputDataType, OutputDataType>::input->empty()) {
|
OutputDataType *inp;
|
||||||
|
while (VisualProcessor<OutputDataType, OutputDataType>::input->try_pop(inp)) {
|
||||||
|
|
||||||
if (!VisualProcessor<OutputDataType, OutputDataType>::isAnyOutputEmpty()) {
|
if (!VisualProcessor<OutputDataType, OutputDataType>::isAnyOutputEmpty()) {
|
||||||
|
if (inp) {
|
||||||
|
inp->decRefCount();
|
||||||
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
OutputDataType *inp;
|
|
||||||
VisualProcessor<OutputDataType, OutputDataType>::input->pop(inp);
|
|
||||||
|
|
||||||
if (inp) {
|
if (inp) {
|
||||||
OutputDataType *outp = buffers.getBuffer();
|
OutputDataType *outp = buffers.getBuffer();
|
||||||
|
@ -86,7 +86,7 @@ void SDRPostThread::updateActiveDemodulators() {
|
|||||||
nRunDemods = 0;
|
nRunDemods = 0;
|
||||||
|
|
||||||
long long centerFreq = wxGetApp().getFrequency();
|
long long centerFreq = wxGetApp().getFrequency();
|
||||||
|
|
||||||
for (demod_i = demodulators.begin(); demod_i != demodulators.end(); demod_i++) {
|
for (demod_i = demodulators.begin(); demod_i != demodulators.end(); demod_i++) {
|
||||||
DemodulatorInstance *demod = *demod_i;
|
DemodulatorInstance *demod = *demod_i;
|
||||||
DemodulatorThreadInputQueue *demodQueue = demod->getIQInputDataPipe();
|
DemodulatorThreadInputQueue *demodQueue = demod->getIQInputDataPipe();
|
||||||
@ -108,7 +108,9 @@ void SDRPostThread::updateActiveDemodulators() {
|
|||||||
DemodulatorThreadIQData *dummyDataOut = new DemodulatorThreadIQData;
|
DemodulatorThreadIQData *dummyDataOut = new DemodulatorThreadIQData;
|
||||||
dummyDataOut->frequency = frequency;
|
dummyDataOut->frequency = frequency;
|
||||||
dummyDataOut->sampleRate = sampleRate;
|
dummyDataOut->sampleRate = sampleRate;
|
||||||
demodQueue->push(dummyDataOut);
|
if (!demodQueue->push(dummyDataOut)) {
|
||||||
|
delete dummyDataOut;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// follow if follow mode
|
// follow if follow mode
|
||||||
@ -119,6 +121,7 @@ void SDRPostThread::updateActiveDemodulators() {
|
|||||||
} else if (!demod->isActive()) { // in range, activate if not activated
|
} else if (!demod->isActive()) { // in range, activate if not activated
|
||||||
demod->setActive(true);
|
demod->setActive(true);
|
||||||
if (wxGetApp().getDemodMgr().getLastActiveDemodulator() == NULL) {
|
if (wxGetApp().getDemodMgr().getLastActiveDemodulator() == NULL) {
|
||||||
|
|
||||||
wxGetApp().getDemodMgr().setActiveDemodulator(demod);
|
wxGetApp().getDemodMgr().setActiveDemodulator(demod);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -181,8 +184,6 @@ void SDRPostThread::run() {
|
|||||||
iqDataOutQueue = static_cast<DemodulatorThreadInputQueue*>(getOutputQueue("IQDataOutput"));
|
iqDataOutQueue = static_cast<DemodulatorThreadInputQueue*>(getOutputQueue("IQDataOutput"));
|
||||||
iqVisualQueue = static_cast<DemodulatorThreadInputQueue*>(getOutputQueue("IQVisualDataOutput"));
|
iqVisualQueue = static_cast<DemodulatorThreadInputQueue*>(getOutputQueue("IQVisualDataOutput"));
|
||||||
iqActiveDemodVisualQueue = static_cast<DemodulatorThreadInputQueue*>(getOutputQueue("IQActiveDemodVisualDataOutput"));
|
iqActiveDemodVisualQueue = static_cast<DemodulatorThreadInputQueue*>(getOutputQueue("IQActiveDemodVisualDataOutput"));
|
||||||
|
|
||||||
iqDataInQueue->set_max_num_items(0);
|
|
||||||
|
|
||||||
while (!stopping) {
|
while (!stopping) {
|
||||||
SDRThreadIQData *data_in;
|
SDRThreadIQData *data_in;
|
||||||
@ -212,14 +213,16 @@ void SDRPostThread::run() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//Only update the list of demodulators here
|
||||||
if (doUpdate) {
|
if (doUpdate) {
|
||||||
updateActiveDemodulators();
|
updateActiveDemodulators();
|
||||||
}
|
}
|
||||||
}
|
} //end while
|
||||||
|
|
||||||
if (iqVisualQueue && !iqVisualQueue->empty()) {
|
//Be safe, remove as many elements as possible
|
||||||
DemodulatorThreadIQData *visualDataDummy;
|
DemodulatorThreadIQData *visualDataDummy;
|
||||||
iqVisualQueue->pop(visualDataDummy);
|
while (iqVisualQueue && iqVisualQueue->try_pop(visualDataDummy)) {
|
||||||
|
visualDataDummy->decRefCount();
|
||||||
}
|
}
|
||||||
|
|
||||||
// buffers.purge();
|
// buffers.purge();
|
||||||
@ -231,7 +234,9 @@ void SDRPostThread::run() {
|
|||||||
void SDRPostThread::terminate() {
|
void SDRPostThread::terminate() {
|
||||||
IOThread::terminate();
|
IOThread::terminate();
|
||||||
SDRThreadIQData *dummy = new SDRThreadIQData;
|
SDRThreadIQData *dummy = new SDRThreadIQData;
|
||||||
iqDataInQueue->push(dummy);
|
if (!iqDataInQueue->push(dummy)) {
|
||||||
|
delete dummy;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void SDRPostThread::runSingleCH(SDRThreadIQData *data_in) {
|
void SDRPostThread::runSingleCH(SDRThreadIQData *data_in) {
|
||||||
@ -292,19 +297,34 @@ void SDRPostThread::runSingleCH(SDRThreadIQData *data_in) {
|
|||||||
iirfilt_crcf_execute_block(dcFilter, &data_in->data[0], dataSize, &demodDataOut->data[0]);
|
iirfilt_crcf_execute_block(dcFilter, &data_in->data[0], dataSize, &demodDataOut->data[0]);
|
||||||
|
|
||||||
if (doDemodVisOut) {
|
if (doDemodVisOut) {
|
||||||
iqActiveDemodVisualQueue->push(demodDataOut);
|
if (!iqActiveDemodVisualQueue->push(demodDataOut)) {
|
||||||
|
demodDataOut->decRefCount();
|
||||||
|
std::cout << "SDRPostThread::runSingleCH() cannot push demodDataOut into iqActiveDemodVisualQueue, is full !" << std::endl;
|
||||||
|
std::this_thread::yield();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (doIQDataOut) {
|
if (doIQDataOut) {
|
||||||
iqDataOutQueue->push(demodDataOut);
|
if (!iqDataOutQueue->push(demodDataOut)) {
|
||||||
|
demodDataOut->decRefCount();
|
||||||
|
std::cout << "SDRPostThread::runSingleCH() cannot push demodDataOut into iqDataOutQueue, is full !" << std::endl;
|
||||||
|
std::this_thread::yield();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (doVisOut) {
|
if (doVisOut) {
|
||||||
iqVisualQueue->push(demodDataOut);
|
if (!iqVisualQueue->push(demodDataOut)) {
|
||||||
|
demodDataOut->decRefCount();
|
||||||
|
std::cout << "SDRPostThread::runSingleCH() cannot push demodDataOut into iqVisualQueue, is full !" << std::endl;
|
||||||
|
std::this_thread::yield();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for (size_t i = 0; i < nRunDemods; i++) {
|
for (size_t i = 0; i < nRunDemods; i++) {
|
||||||
runDemods[i]->getIQInputDataPipe()->push(demodDataOut);
|
if (!runDemods[i]->getIQInputDataPipe()->push(demodDataOut)) {
|
||||||
|
demodDataOut->decRefCount();
|
||||||
|
std::this_thread::yield();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -342,9 +362,19 @@ void SDRPostThread::runPFBCH(SDRThreadIQData *data_in) {
|
|||||||
iqDataOut->sampleRate = data_in->sampleRate;
|
iqDataOut->sampleRate = data_in->sampleRate;
|
||||||
iqDataOut->data.assign(data_in->data.begin(), data_in->data.begin() + dataSize);
|
iqDataOut->data.assign(data_in->data.begin(), data_in->data.begin() + dataSize);
|
||||||
|
|
||||||
iqDataOutQueue->push(iqDataOut);
|
if (!iqDataOutQueue->push(iqDataOut)) {
|
||||||
if (doVis) {
|
std::cout << "SDRPostThread::runPFBCH() cannot push iqDataOut into iqDataOutQueue, is full !" << std::endl;
|
||||||
iqVisualQueue->push(iqDataOut);
|
iqDataOut->decRefCount();
|
||||||
|
std::this_thread::yield();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
if (doVis) {
|
||||||
|
if (!iqVisualQueue->push(iqDataOut)) {
|
||||||
|
std::cout << "SDRPostThread::runPFBCH() cannot push iqDataOut into iqVisualQueue, is full !" << std::endl;
|
||||||
|
iqDataOut->decRefCount();
|
||||||
|
std::this_thread::yield();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -440,13 +470,21 @@ void SDRPostThread::runPFBCH(SDRThreadIQData *data_in) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (doDemodVis) {
|
if (doDemodVis) {
|
||||||
iqActiveDemodVisualQueue->push(demodDataOut);
|
if (!iqActiveDemodVisualQueue->push(demodDataOut)) {
|
||||||
|
std::cout << "SDRPostThread::runPFBCH() cannot push demodDataOut into iqActiveDemodVisualQueue, is full !" << std::endl;
|
||||||
|
demodDataOut->decRefCount();
|
||||||
|
std::this_thread::yield();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for (size_t j = 0; j < nRunDemods; j++) {
|
for (size_t j = 0; j < nRunDemods; j++) {
|
||||||
if (demodChannel[j] == i) {
|
if (demodChannel[j] == i) {
|
||||||
DemodulatorInstance *demod = runDemods[j];
|
DemodulatorInstance *demod = runDemods[j];
|
||||||
demod->getIQInputDataPipe()->push(demodDataOut);
|
|
||||||
|
if (!demod->getIQInputDataPipe()->push(demodDataOut)) {
|
||||||
|
demodDataOut->decRefCount();
|
||||||
|
std::this_thread::yield();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -162,6 +162,7 @@ void SDRThread::readStream(SDRThreadIQDataQueue* iqDataOutQueue) {
|
|||||||
int nElems = numElems.load();
|
int nElems = numElems.load();
|
||||||
int mtElems = mtuElems.load();
|
int mtElems = mtuElems.load();
|
||||||
|
|
||||||
|
//If overflow occured on the previous readStream(), transfer it in inpBuffer now
|
||||||
if (numOverflow > 0) {
|
if (numOverflow > 0) {
|
||||||
int n_overflow = numOverflow;
|
int n_overflow = numOverflow;
|
||||||
if (n_overflow > nElems) {
|
if (n_overflow > nElems) {
|
||||||
@ -176,9 +177,18 @@ void SDRThread::readStream(SDRThreadIQDataQueue* iqDataOutQueue) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//attempt readStream() at most nElems, by mtElems-sized chunks, append inpBuffer.
|
||||||
while (n_read < nElems && !stopping) {
|
while (n_read < nElems && !stopping) {
|
||||||
int n_requested = nElems-n_read;
|
int n_requested = nElems-n_read;
|
||||||
|
|
||||||
int n_stream_read = device->readStream(stream, buffs, mtElems, flags, timeNs);
|
int n_stream_read = device->readStream(stream, buffs, mtElems, flags, timeNs);
|
||||||
|
|
||||||
|
//if the n_stream_read <= 0, bail out from reading.
|
||||||
|
if (n_stream_read <= 0) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
//sucess read beyond nElems, with overflow
|
||||||
if ((n_read + n_stream_read) > nElems) {
|
if ((n_read + n_stream_read) > nElems) {
|
||||||
memcpy(&inpBuffer.data[n_read], buffs[0], n_requested * sizeof(float) * 2);
|
memcpy(&inpBuffer.data[n_read], buffs[0], n_requested * sizeof(float) * 2);
|
||||||
numOverflow = n_stream_read-n_requested;
|
numOverflow = n_stream_read-n_requested;
|
||||||
@ -194,7 +204,7 @@ void SDRThread::readStream(SDRThreadIQDataQueue* iqDataOutQueue) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (n_read > 0 && !stopping) {
|
if (n_read > 0 && !stopping && !iqDataOutQueue->full()) {
|
||||||
SDRThreadIQData *dataOut = buffers.getBuffer();
|
SDRThreadIQData *dataOut = buffers.getBuffer();
|
||||||
|
|
||||||
if (iq_swap.load()) {
|
if (iq_swap.load()) {
|
||||||
@ -212,7 +222,16 @@ void SDRThread::readStream(SDRThreadIQDataQueue* iqDataOutQueue) {
|
|||||||
dataOut->dcCorrected = hasHardwareDC.load();
|
dataOut->dcCorrected = hasHardwareDC.load();
|
||||||
dataOut->numChannels = numChannels.load();
|
dataOut->numChannels = numChannels.load();
|
||||||
|
|
||||||
iqDataOutQueue->push(dataOut);
|
if (!iqDataOutQueue->push(dataOut)) {
|
||||||
|
//The rest of the system saturates,
|
||||||
|
//finally the push didn't suceeded, recycle dataOut immediatly.
|
||||||
|
dataOut->setRefCount(0);
|
||||||
|
|
||||||
|
std::cout << "SDRThread::readStream(): iqDataOutQueue output queue is full, discard processing ! " << std::endl;
|
||||||
|
|
||||||
|
//saturation, let a chance to the other threads to consume the existing samples
|
||||||
|
std::this_thread::yield();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -68,11 +68,13 @@ public:
|
|||||||
bool push(const value_type& item) {
|
bool push(const value_type& item) {
|
||||||
std::lock_guard < std::mutex > lock(m_mutex);
|
std::lock_guard < std::mutex > lock(m_mutex);
|
||||||
|
|
||||||
if (m_max_num_items.load() > 0 && m_queue.size() > m_max_num_items.load())
|
if (m_max_num_items.load() > 0 && m_queue.size() > m_max_num_items.load()) {
|
||||||
|
m_condition.notify_all();
|
||||||
return false;
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
m_queue.push(item);
|
m_queue.push(item);
|
||||||
m_condition.notify_one();
|
m_condition.notify_all();
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -84,11 +86,13 @@ public:
|
|||||||
bool push(const value_type&& item) {
|
bool push(const value_type&& item) {
|
||||||
std::lock_guard < std::mutex > lock(m_mutex);
|
std::lock_guard < std::mutex > lock(m_mutex);
|
||||||
|
|
||||||
if (m_max_num_items.load() > 0 && m_queue.size() > m_max_num_items.load())
|
if (m_max_num_items.load() > 0 && m_queue.size() > m_max_num_items.load()) {
|
||||||
|
m_condition.notify_all();
|
||||||
return false;
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
m_queue.push(item);
|
m_queue.push(item);
|
||||||
m_condition.notify_one();
|
m_condition.notify_all();
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -97,11 +97,10 @@ bool ScopeCanvas::getShowDb() {
|
|||||||
void ScopeCanvas::OnPaint(wxPaintEvent& WXUNUSED(event)) {
|
void ScopeCanvas::OnPaint(wxPaintEvent& WXUNUSED(event)) {
|
||||||
wxPaintDC dc(this);
|
wxPaintDC dc(this);
|
||||||
const wxSize ClientSize = GetClientSize();
|
const wxSize ClientSize = GetClientSize();
|
||||||
|
|
||||||
while (!inputData.empty()) {
|
ScopeRenderData *avData;
|
||||||
ScopeRenderData *avData;
|
while (inputData.try_pop(avData)) {
|
||||||
inputData.pop(avData);
|
|
||||||
|
|
||||||
|
|
||||||
if (!avData->spectrum) {
|
if (!avData->spectrum) {
|
||||||
scopePanel.setMode(avData->mode);
|
scopePanel.setMode(avData->mode);
|
||||||
|
@ -51,11 +51,9 @@ void SpectrumCanvas::OnPaint(wxPaintEvent& WXUNUSED(event)) {
|
|||||||
wxPaintDC dc(this);
|
wxPaintDC dc(this);
|
||||||
const wxSize ClientSize = GetClientSize();
|
const wxSize ClientSize = GetClientSize();
|
||||||
|
|
||||||
if (!visualDataQueue.empty()) {
|
SpectrumVisualData *vData;
|
||||||
SpectrumVisualData *vData;
|
if (visualDataQueue.try_pop(vData)) {
|
||||||
|
|
||||||
visualDataQueue.pop(vData);
|
|
||||||
|
|
||||||
if (vData) {
|
if (vData) {
|
||||||
spectrumPanel.setPoints(vData->spectrum_points);
|
spectrumPanel.setPoints(vData->spectrum_points);
|
||||||
spectrumPanel.setPeakPoints(vData->spectrum_hold_points);
|
spectrumPanel.setPeakPoints(vData->spectrum_hold_points);
|
||||||
|
@ -95,8 +95,8 @@ void WaterfallCanvas::processInputQueue() {
|
|||||||
if (lpsIndex >= targetVis) {
|
if (lpsIndex >= targetVis) {
|
||||||
while (lpsIndex >= targetVis) {
|
while (lpsIndex >= targetVis) {
|
||||||
SpectrumVisualData *vData;
|
SpectrumVisualData *vData;
|
||||||
if (!visualDataQueue.empty()) {
|
|
||||||
visualDataQueue.pop(vData);
|
if (visualDataQueue.try_pop(vData)) {
|
||||||
|
|
||||||
if (vData) {
|
if (vData) {
|
||||||
if (vData->spectrum_points.size() == fft_size * 2) {
|
if (vData->spectrum_points.size() == fft_size * 2) {
|
||||||
@ -912,11 +912,13 @@ void WaterfallCanvas::updateCenterFrequency(long long freq) {
|
|||||||
|
|
||||||
void WaterfallCanvas::setLinesPerSecond(int lps) {
|
void WaterfallCanvas::setLinesPerSecond(int lps) {
|
||||||
std::lock_guard < std::mutex > lock(tex_update);
|
std::lock_guard < std::mutex > lock(tex_update);
|
||||||
|
|
||||||
linesPerSecond = lps;
|
linesPerSecond = lps;
|
||||||
while (!visualDataQueue.empty()) {
|
|
||||||
SpectrumVisualData *vData;
|
|
||||||
visualDataQueue.pop(vData);
|
|
||||||
|
|
||||||
|
//empty all
|
||||||
|
SpectrumVisualData *vData;
|
||||||
|
while (visualDataQueue.try_pop(vData)) {
|
||||||
|
|
||||||
if (vData) {
|
if (vData) {
|
||||||
vData->decRefCount();
|
vData->decRefCount();
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user