DemodulatorThread convert to IOThread named queues

This commit is contained in:
Charles J. Cliffe 2015-07-30 00:28:53 -04:00
parent d53aabf73a
commit 0a9dd7692a
5 changed files with 39 additions and 41 deletions

View File

@ -1,7 +1,7 @@
#include "DemodulatorInstance.h"
DemodulatorInstance::DemodulatorInstance() :
t_Demod(NULL), t_PreDemod(NULL), t_Audio(NULL), threadQueueDemod(NULL), demodulatorThread(NULL), currentAudioGain(1.0) {
threadQueueDemod(NULL), demodulatorThread(NULL), t_PreDemod(NULL), t_Demod(NULL), t_Audio(NULL), currentAudioGain(1.0) {
terminated.store(true);
audioTerminated.store(true);
@ -24,7 +24,7 @@ DemodulatorInstance::DemodulatorInstance() :
threadQueueCommand = new DemodulatorThreadCommandQueue;
threadQueueNotify = new DemodulatorThreadCommandQueue;
threadQueueControl = new DemodulatorThreadControlCommandQueue;
demodulatorPreThread = new DemodulatorPreThread();
demodulatorPreThread->setInputQueue("IQDataInput",threadQueueDemod);
demodulatorPreThread->setOutputQueue("IQDataOut",threadQueuePostDemod);
@ -32,13 +32,15 @@ DemodulatorInstance::DemodulatorInstance() :
demodulatorPreThread->setOutputQueue("NotifyQueue",threadQueueNotify);
demodulatorPreThread->setInputQueue("CommandQueue",threadQueueCommand);
demodulatorThread = new DemodulatorThread(threadQueuePostDemod, threadQueueControl, threadQueueNotify);
audioInputQueue = new AudioThreadInputQueue;
demodulatorThread = new DemodulatorThread();
demodulatorThread->setInputQueue("IQDataInput",threadQueuePostDemod);
demodulatorThread->setInputQueue("ControlQueue",threadQueueControl);
demodulatorThread->setOutputQueue("NotifyQueue",threadQueueNotify);
demodulatorThread->setOutputQueue("AudioDataOut", audioInputQueue);
audioThread = new AudioThread(audioInputQueue, threadQueueNotify);
demodulatorThread->setAudioOutputQueue(audioInputQueue);
currentDemodType = demodulatorThread->getDemodulatorType();
}

View File

@ -8,7 +8,8 @@
#include "DemodulatorPreThread.h"
#include "CubicSDR.h"
DemodulatorPreThread::DemodulatorPreThread() : IOThread(), iqResampler(NULL), iqResampleRatio(1), audioResampler(NULL), stereoResampler(NULL), audioResampleRatio(1), firStereoLeft(NULL), firStereoRight(NULL), iirStereoPilot(NULL) {
DemodulatorPreThread::DemodulatorPreThread() : IOThread(), iqResampler(NULL), iqResampleRatio(1), audioResampler(NULL), stereoResampler(NULL), audioResampleRatio(1), firStereoLeft(NULL), firStereoRight(NULL), iirStereoPilot(NULL), iqInputQueue(NULL), iqOutputQueue(NULL), threadQueueNotify(NULL), commandQueue(NULL)
{
initialized.store(false);
freqShifter = nco_crcf_create(LIQUID_VCO);
@ -91,11 +92,10 @@ void DemodulatorPreThread::run() {
ReBuffer<DemodulatorThreadPostIQData> buffers;
DemodulatorThreadInputQueue* iqInputQueue = (DemodulatorThreadInputQueue*)getInputQueue("IQDataInput");
DemodulatorThreadPostInputQueue* iqOutputQueue = (DemodulatorThreadPostInputQueue*)getOutputQueue("IQDataOut");
DemodulatorThreadControlCommandQueue *threadQueueControl = (DemodulatorThreadControlCommandQueue *)getInputQueue("ControlQueue");
DemodulatorThreadCommandQueue* threadQueueNotify = (DemodulatorThreadCommandQueue*)getOutputQueue("NotifyQueue");
DemodulatorThreadCommandQueue* commandQueue = ( DemodulatorThreadCommandQueue*)getInputQueue("CommandQueue");
iqInputQueue = (DemodulatorThreadInputQueue*)getInputQueue("IQDataInput");
iqOutputQueue = (DemodulatorThreadPostInputQueue*)getOutputQueue("IQDataOut");
threadQueueNotify = (DemodulatorThreadCommandQueue*)getOutputQueue("NotifyQueue");
commandQueue = ( DemodulatorThreadCommandQueue*)getInputQueue("CommandQueue");
std::vector<liquid_float_complex> in_buf_data;
std::vector<liquid_float_complex> out_buf_data;
@ -310,12 +310,6 @@ void DemodulatorPreThread::run() {
buffers.purge();
DemodulatorThreadIQData *inp = new DemodulatorThreadIQData; // push dummy to nudge queue
iqInputQueue->push(inp);
workerThread->terminate();
t_Worker->detach();
delete t_Worker;
DemodulatorThreadCommand tCmd(DemodulatorThreadCommand::DEMOD_THREAD_CMD_DEMOD_PREPROCESS_TERMINATED);
tCmd.context = this;
threadQueueNotify->push(tCmd);
@ -324,4 +318,9 @@ void DemodulatorPreThread::run() {
void DemodulatorPreThread::terminate() {
terminated = true;
DemodulatorThreadIQData *inp = new DemodulatorThreadIQData; // push dummy to nudge queue
iqInputQueue->push(inp);
workerThread->terminate();
t_Worker->detach();
delete t_Worker;
}

View File

@ -58,4 +58,9 @@ protected:
DemodulatorThreadWorkerCommandQueue *workerQueue;
DemodulatorThreadWorkerResultQueue *workerResults;
DemodulatorThreadInputQueue* iqInputQueue;
DemodulatorThreadPostInputQueue* iqOutputQueue;
DemodulatorThreadCommandQueue* threadQueueNotify;
DemodulatorThreadCommandQueue* commandQueue;
};

View File

@ -11,11 +11,7 @@
#include <pthread.h>
#endif
DemodulatorThread::DemodulatorThread(DemodulatorThreadPostInputQueue* iqInputQueue, DemodulatorThreadControlCommandQueue *threadQueueControl,
DemodulatorThreadCommandQueue* threadQueueNotify) : IOThread(),
iqInputQueue(iqInputQueue), audioVisOutputQueue(NULL), audioOutputQueue(NULL), iqAutoGain(NULL), amOutputCeil(1), amOutputCeilMA(1), amOutputCeilMAA(
1), threadQueueNotify(threadQueueNotify), threadQueueControl(threadQueueControl), squelchLevel(0), signalLevel(
0), squelchEnabled(false), audioSampleRate(0) {
DemodulatorThread::DemodulatorThread() : IOThread(), iqAutoGain(NULL), amOutputCeil(1), amOutputCeilMA(1), amOutputCeilMAA(1), audioSampleRate(0), squelchLevel(0), signalLevel(0), squelchEnabled(false), iqInputQueue(NULL), audioOutputQueue(NULL), audioVisOutputQueue(NULL), threadQueueControl(NULL), threadQueueNotify(NULL) {
stereo.store(false);
agcEnabled.store(false);
@ -67,6 +63,11 @@ void DemodulatorThread::run() {
std::cout << "Demodulator thread started.." << std::endl;
iqInputQueue = (DemodulatorThreadPostInputQueue*)getInputQueue("IQDataInput");
audioOutputQueue = (AudioThreadInputQueue*)getOutputQueue("AudioDataOut");
threadQueueControl = (DemodulatorThreadControlCommandQueue *)getInputQueue("ControlQueue");
threadQueueNotify = (DemodulatorThreadCommandQueue*)getOutputQueue("NotifyQueue");
switch (demodulatorType.load()) {
case DEMOD_TYPE_FM:
break;
@ -485,21 +486,14 @@ void DemodulatorThread::run() {
DemodulatorThreadCommand tCmd(DemodulatorThreadCommand::DEMOD_THREAD_CMD_DEMOD_TERMINATED);
tCmd.context = this;
threadQueueNotify->push(tCmd);
std::cout << "Demodulator thread done." << std::endl;
#ifdef __APPLE__
return this;
#endif
}
void DemodulatorThread::setVisualOutputQueue(DemodulatorThreadOutputQueue *tQueue) {
audioVisOutputQueue = tQueue;
}
void DemodulatorThread::setAudioOutputQueue(AudioThreadInputQueue *tQueue) {
audioOutputQueue = tQueue;
}
void DemodulatorThread::terminate() {
terminated = true;
DemodulatorThreadPostIQData *inp = new DemodulatorThreadPostIQData; // push dummy to nudge queue

View File

@ -13,14 +13,12 @@ typedef ThreadQueue<AudioThreadInput *> DemodulatorThreadOutputQueue;
class DemodulatorThread : public IOThread {
public:
DemodulatorThread(DemodulatorThreadPostInputQueue* iqInputQueue, DemodulatorThreadControlCommandQueue *threadQueueControl,
DemodulatorThreadCommandQueue* threadQueueNotify);
DemodulatorThread();
~DemodulatorThread();
void run();
void setVisualOutputQueue(DemodulatorThreadOutputQueue *tQueue);
void setAudioOutputQueue(AudioThreadInputQueue *tQueue);
void terminate();
@ -53,10 +51,6 @@ protected:
std::vector<float> resampledOutputData;
std::vector<float> resampledStereoData;
DemodulatorThreadPostInputQueue* iqInputQueue;
DemodulatorThreadOutputQueue* audioVisOutputQueue;
AudioThreadInputQueue *audioOutputQueue;
freqdem demodFM;
ampmodem demodAM;
ampmodem demodAM_DSB_CSP;
@ -75,9 +69,13 @@ protected:
std::atomic_int demodulatorType;
int audioSampleRate;
DemodulatorThreadCommandQueue* threadQueueNotify;
DemodulatorThreadControlCommandQueue *threadQueueControl;
std::atomic<float> squelchLevel;
std::atomic<float> signalLevel;
bool squelchEnabled;
DemodulatorThreadPostInputQueue* iqInputQueue;
AudioThreadInputQueue *audioOutputQueue;
DemodulatorThreadOutputQueue* audioVisOutputQueue;
DemodulatorThreadControlCommandQueue *threadQueueControl;
DemodulatorThreadCommandQueue* threadQueueNotify;
};