DemodulatorPreThread convert to IOThread named queues

This commit is contained in:
Charles J. Cliffe 2015-07-29 22:52:54 -04:00
parent 0d66c92f30
commit d53aabf73a
5 changed files with 27 additions and 32 deletions

View File

@ -10,6 +10,7 @@ IOThread::~IOThread() {
#ifdef __APPLE__ #ifdef __APPLE__
void *IOThread::threadMain() { void *IOThread::threadMain() {
terminated.store(false);
run(); run();
return this; return this;
}; };
@ -19,6 +20,7 @@ void *IOThread::pthread_helper(void *context) {
}; };
#else #else
void IOThread::threadMain() { void IOThread::threadMain() {
terminated.store(false);
run(); run();
}; };
#endif #endif

View File

@ -71,7 +71,7 @@ private:
class IOThread { class IOThread {
public: public:
IOThread(); IOThread();
~IOThread(); virtual ~IOThread();
static void *pthread_helper(void *context); static void *pthread_helper(void *context);

View File

@ -25,8 +25,13 @@ DemodulatorInstance::DemodulatorInstance() :
threadQueueNotify = new DemodulatorThreadCommandQueue; threadQueueNotify = new DemodulatorThreadCommandQueue;
threadQueueControl = new DemodulatorThreadControlCommandQueue; threadQueueControl = new DemodulatorThreadControlCommandQueue;
demodulatorPreThread = new DemodulatorPreThread(threadQueueDemod, threadQueuePostDemod, threadQueueControl, threadQueueNotify); demodulatorPreThread = new DemodulatorPreThread();
demodulatorPreThread->setCommandQueue(threadQueueCommand); demodulatorPreThread->setInputQueue("IQDataInput",threadQueueDemod);
demodulatorPreThread->setOutputQueue("IQDataOut",threadQueuePostDemod);
demodulatorPreThread->setInputQueue("ControlQueue",threadQueueControl);
demodulatorPreThread->setOutputQueue("NotifyQueue",threadQueueNotify);
demodulatorPreThread->setInputQueue("CommandQueue",threadQueueCommand);
demodulatorThread = new DemodulatorThread(threadQueuePostDemod, threadQueueControl, threadQueueNotify); demodulatorThread = new DemodulatorThread(threadQueuePostDemod, threadQueueControl, threadQueueNotify);
audioInputQueue = new AudioThreadInputQueue; audioInputQueue = new AudioThreadInputQueue;

View File

@ -8,11 +8,7 @@
#include "DemodulatorPreThread.h" #include "DemodulatorPreThread.h"
#include "CubicSDR.h" #include "CubicSDR.h"
DemodulatorPreThread::DemodulatorPreThread(DemodulatorThreadInputQueue* iqInputQueue, DemodulatorThreadPostInputQueue* iqOutputQueue, DemodulatorPreThread::DemodulatorPreThread() : IOThread(), iqResampler(NULL), iqResampleRatio(1), audioResampler(NULL), stereoResampler(NULL), audioResampleRatio(1), firStereoLeft(NULL), firStereoRight(NULL), iirStereoPilot(NULL) {
DemodulatorThreadControlCommandQueue *threadQueueControl, DemodulatorThreadCommandQueue* threadQueueNotify) : IOThread(),
iqInputQueue(iqInputQueue), iqOutputQueue(iqOutputQueue), audioResampler(NULL), stereoResampler(NULL), iqResampleRatio(
1), audioResampleRatio(1), firStereoRight(NULL), firStereoLeft(NULL), iirStereoPilot(NULL), iqResampler(NULL), commandQueue(NULL), threadQueueNotify(threadQueueNotify), threadQueueControl(
threadQueueControl) {
initialized.store(false); initialized.store(false);
freqShifter = nco_crcf_create(LIQUID_VCO); freqShifter = nco_crcf_create(LIQUID_VCO);
@ -21,8 +17,6 @@ DemodulatorPreThread::DemodulatorPreThread(DemodulatorThreadInputQueue* iqInputQ
workerQueue = new DemodulatorThreadWorkerCommandQueue; workerQueue = new DemodulatorThreadWorkerCommandQueue;
workerResults = new DemodulatorThreadWorkerResultQueue; workerResults = new DemodulatorThreadWorkerResultQueue;
workerThread = new DemodulatorWorkerThread(workerQueue, workerResults); workerThread = new DemodulatorWorkerThread(workerQueue, workerResults);
t_Worker = new std::thread(&DemodulatorWorkerThread::threadMain, workerThread);
} }
void DemodulatorPreThread::initialize() { void DemodulatorPreThread::initialize() {
@ -93,8 +87,16 @@ void DemodulatorPreThread::run() {
std::cout << "Demodulator preprocessor thread started.." << std::endl; std::cout << "Demodulator preprocessor thread started.." << std::endl;
t_Worker = new std::thread(&DemodulatorWorkerThread::threadMain, workerThread);
ReBuffer<DemodulatorThreadPostIQData> buffers; ReBuffer<DemodulatorThreadPostIQData> buffers;
DemodulatorThreadInputQueue* iqInputQueue = (DemodulatorThreadInputQueue*)getInputQueue("IQDataInput");
DemodulatorThreadPostInputQueue* iqOutputQueue = (DemodulatorThreadPostInputQueue*)getOutputQueue("IQDataOut");
DemodulatorThreadControlCommandQueue *threadQueueControl = (DemodulatorThreadControlCommandQueue *)getInputQueue("ControlQueue");
DemodulatorThreadCommandQueue* threadQueueNotify = (DemodulatorThreadCommandQueue*)getOutputQueue("NotifyQueue");
DemodulatorThreadCommandQueue* commandQueue = ( DemodulatorThreadCommandQueue*)getInputQueue("CommandQueue");
std::vector<liquid_float_complex> in_buf_data; std::vector<liquid_float_complex> in_buf_data;
std::vector<liquid_float_complex> out_buf_data; std::vector<liquid_float_complex> out_buf_data;
// liquid_float_complex carrySample; // Keep the stream count even to simplify some demod operations // liquid_float_complex carrySample; // Keep the stream count even to simplify some demod operations
@ -308,6 +310,12 @@ void DemodulatorPreThread::run() {
buffers.purge(); buffers.purge();
DemodulatorThreadIQData *inp = new DemodulatorThreadIQData; // push dummy to nudge queue
iqInputQueue->push(inp);
workerThread->terminate();
t_Worker->detach();
delete t_Worker;
DemodulatorThreadCommand tCmd(DemodulatorThreadCommand::DEMOD_THREAD_CMD_DEMOD_PREPROCESS_TERMINATED); DemodulatorThreadCommand tCmd(DemodulatorThreadCommand::DEMOD_THREAD_CMD_DEMOD_PREPROCESS_TERMINATED);
tCmd.context = this; tCmd.context = this;
threadQueueNotify->push(tCmd); threadQueueNotify->push(tCmd);
@ -316,9 +324,4 @@ void DemodulatorPreThread::run() {
void DemodulatorPreThread::terminate() { void DemodulatorPreThread::terminate() {
terminated = true; terminated = true;
DemodulatorThreadIQData *inp = new DemodulatorThreadIQData; // push dummy to nudge queue
iqInputQueue->push(inp);
workerThread->terminate();
t_Worker->detach();
delete t_Worker;
} }

View File

@ -10,20 +10,11 @@
class DemodulatorPreThread : public IOThread { class DemodulatorPreThread : public IOThread {
public: public:
DemodulatorPreThread(DemodulatorThreadInputQueue* iqInputQueue, DemodulatorThreadPostInputQueue* iqOutputQueue, DemodulatorPreThread();
DemodulatorThreadControlCommandQueue *threadQueueControl, DemodulatorThreadCommandQueue* threadQueueNotify);
~DemodulatorPreThread(); ~DemodulatorPreThread();
void run(); void run();
void setCommandQueue(DemodulatorThreadCommandQueue *tQueue) {
commandQueue = tQueue;
}
void setDemodulatorControlQueue(DemodulatorThreadControlCommandQueue *tQueue) {
threadQueueControl = tQueue;
}
DemodulatorThreadParameters &getParams() { DemodulatorThreadParameters &getParams() {
return params; return params;
} }
@ -42,10 +33,6 @@ public:
#endif #endif
protected: protected:
DemodulatorThreadInputQueue* iqInputQueue;
DemodulatorThreadPostInputQueue* iqOutputQueue;
DemodulatorThreadCommandQueue* commandQueue;
msresamp_crcf iqResampler; msresamp_crcf iqResampler;
double iqResampleRatio; double iqResampleRatio;
std::vector<liquid_float_complex> resampledData; std::vector<liquid_float_complex> resampledData;
@ -71,6 +58,4 @@ protected:
DemodulatorThreadWorkerCommandQueue *workerQueue; DemodulatorThreadWorkerCommandQueue *workerQueue;
DemodulatorThreadWorkerResultQueue *workerResults; DemodulatorThreadWorkerResultQueue *workerResults;
DemodulatorThreadCommandQueue* threadQueueNotify;
DemodulatorThreadControlCommandQueue *threadQueueControl;
}; };