From 5fc5e4269cef01fac73397fdfa53a0753fdf3a1d Mon Sep 17 00:00:00 2001 From: "Charles J. Cliffe" Date: Sun, 30 Nov 2014 23:33:55 -0500 Subject: [PATCH] Demodulator worker thread test --- CMakeLists.txt | 2 + src/demod/DemodulatorThread.cpp | 34 +++++++-- src/demod/DemodulatorThread.h | 12 +++- src/demod/DemodulatorWorkerThread.cpp | 78 ++++++++++++++++++++ src/demod/DemodulatorWorkerThread.h | 100 ++++++++++++++++++++++++++ src/sdr/SDRThread.h | 2 +- src/visual/WaterfallCanvas.cpp | 6 +- 7 files changed, 222 insertions(+), 12 deletions(-) create mode 100644 src/demod/DemodulatorWorkerThread.cpp create mode 100644 src/demod/DemodulatorWorkerThread.h diff --git a/CMakeLists.txt b/CMakeLists.txt index f2390be..b20606b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -92,6 +92,7 @@ SET (cubicsdr_sources src/sdr/SDRThread.cpp src/sdr/SDRPostThread.cpp src/demod/DemodulatorThread.cpp + src/demod/DemodulatorWorkerThread.cpp src/demod/DemodulatorMgr.cpp src/audio/AudioThread.cpp src/util/Gradient.cpp @@ -113,6 +114,7 @@ SET (cubicsdr_headers src/sdr/SDRThread.h src/sdr/SDRPostThread.h src/demod/DemodulatorThread.h + src/demod/DemodulatorWorkerThread.h src/demod/DemodulatorMgr.h src/audio/AudioThread.h src/util/Gradient.h diff --git a/src/demod/DemodulatorThread.cpp b/src/demod/DemodulatorThread.cpp index 7588521..e1d1160 100644 --- a/src/demod/DemodulatorThread.cpp +++ b/src/demod/DemodulatorThread.cpp @@ -4,7 +4,7 @@ DemodulatorThread::DemodulatorThread(DemodulatorThreadInputQueue* pQueue) : inputQueue(pQueue), visOutQueue(NULL), terminated(false), initialized(false), audio_resampler(NULL), resample_ratio(1), audio_resample_ratio( - 1), resampler(NULL), commandQueue(NULL), fir_filter(NULL) { + 1), resampler(NULL), commandQueue(NULL), fir_filter(NULL), audioInputQueue(NULL) { float kf = 0.5; // modulation factor fdem = freqdem_create(kf); @@ -13,6 +13,11 @@ DemodulatorThread::DemodulatorThread(DemodulatorThreadInputQueue* pQueue) : nco_shift = nco_crcf_create(LIQUID_VCO); shift_freq = 0; + workerQueue = new DemodulatorThreadWorkerCommandQueue; + workerResults = new DemodulatorThreadWorkerResultQueue; + workerThread = new DemodulatorWorkerThread(workerQueue,workerResults); + + t_Worker = new std::thread(&DemodulatorWorkerThread::threadMain,workerThread); } void DemodulatorThread::initialize() { @@ -66,6 +71,9 @@ void DemodulatorThread::initialize() { } DemodulatorThread::~DemodulatorThread() { + delete workerThread; + delete workerQueue; + delete workerResults; } void DemodulatorThread::threadMain() { @@ -79,29 +87,40 @@ void DemodulatorThread::threadMain() { DemodulatorThreadIQData inp; inputQueue->pop(inp); + bool bandwidthChanged = false; + DemodulatorThreadParameters bandwidthParams = params; + if (!commandQueue->empty()) { bool paramsChanged = false; while (!commandQueue->empty()) { DemodulatorThreadCommand command; commandQueue->pop(command); switch (command.cmd) { - case DemodulatorThreadCommand::SDR_THREAD_CMD_SET_BANDWIDTH: + case DemodulatorThreadCommand::DEMOD_THREAD_CMD_SET_BANDWIDTH: if (command.int_value < 3000) { command.int_value = 3000; } if (command.int_value > SRATE) { command.int_value = SRATE; } - params.bandwidth = command.int_value; - paramsChanged = true; + bandwidthParams.bandwidth = command.int_value; + bandwidthChanged = true; break; - case DemodulatorThreadCommand::SDR_THREAD_CMD_SET_FREQUENCY: + case DemodulatorThreadCommand::DEMOD_THREAD_CMD_SET_FREQUENCY: params.frequency = command.int_value; break; } } - if (paramsChanged) { + if (bandwidthChanged) { +// DemodulatorWorkerThreadCommand command(DemodulatorWorkerThreadCommand::DEMOD_WORKER_THREAD_CMD_BUILD_FILTERS); +// command.audioSampleRate = bandwidthParams.audioSampleRate; +// command.bandwidth = bandwidthParams.bandwidth; +// command.frequency = bandwidthParams.frequency; +// command.inputRate = bandwidthParams.inputRate; +// +// workerQueue->push(command); + initialize(); while (!inputQueue->empty()) { // catch up inputQueue->pop(inp); @@ -206,4 +225,7 @@ void DemodulatorThread::terminate() { terminated = true; DemodulatorThreadIQData inp; // push dummy to nudge queue inputQueue->push(inp); + + workerThread->terminate(); + t_Worker->join(); } diff --git a/src/demod/DemodulatorThread.h b/src/demod/DemodulatorThread.h index 9198af4..3fefd13 100644 --- a/src/demod/DemodulatorThread.h +++ b/src/demod/DemodulatorThread.h @@ -14,6 +14,7 @@ #include "AudioThread.h" #include "ThreadQueue.h" #include "CubicSDRDefs.h" +#include "DemodulatorWorkerThread.h" enum DemodulatorType { DEMOD_TYPE_NULL, DEMOD_TYPE_AM, DEMOD_TYPE_FM, DEMOD_TYPE_LSB, DEMOD_TYPE_USB @@ -22,11 +23,11 @@ enum DemodulatorType { class DemodulatorThreadCommand { public: enum DemodulatorThreadCommandEnum { - SDR_THREAD_CMD_NULL, SDR_THREAD_CMD_SET_BANDWIDTH, SDR_THREAD_CMD_SET_FREQUENCY + DEMOD_THREAD_CMD_NULL, DEMOD_THREAD_CMD_SET_BANDWIDTH, DEMOD_THREAD_CMD_SET_FREQUENCY }; DemodulatorThreadCommand() : - cmd(cmd), int_value(SDR_THREAD_CMD_NULL) { + cmd(DEMOD_THREAD_CMD_NULL), int_value(0) { } @@ -106,6 +107,7 @@ typedef ThreadQueue DemodulatorThreadInputQueue; typedef ThreadQueue DemodulatorThreadOutputQueue; typedef ThreadQueue DemodulatorThreadCommandQueue; + class DemodulatorThread { public: @@ -158,4 +160,10 @@ protected: std::atomic terminated; std::atomic initialized; + + DemodulatorWorkerThread *workerThread; + std::thread *t_Worker; + + DemodulatorThreadWorkerCommandQueue *workerQueue; + DemodulatorThreadWorkerResultQueue *workerResults; }; diff --git a/src/demod/DemodulatorWorkerThread.cpp b/src/demod/DemodulatorWorkerThread.cpp new file mode 100644 index 0000000..84c44d6 --- /dev/null +++ b/src/demod/DemodulatorWorkerThread.cpp @@ -0,0 +1,78 @@ +#include "DemodulatorWorkerThread.h" +#include "CubicSDRDefs.h" +#include + +DemodulatorWorkerThread::DemodulatorWorkerThread(DemodulatorThreadWorkerCommandQueue* in, DemodulatorThreadWorkerResultQueue* out) : + terminated(false), commandQueue(in), resultQueue(out) { + +} + +DemodulatorWorkerThread::~DemodulatorWorkerThread() { +} + +void DemodulatorWorkerThread::threadMain() { + + std::cout << "Demodulator worker thread started.." << std::endl; + + while (!terminated) { + bool filterChanged = false; + DemodulatorWorkerThreadCommand filterCommand; + DemodulatorWorkerThreadCommand command; + DemodulatorWorkerThreadResult result; + + bool done = false; + while (!done) { + commandQueue->pop(command); + switch (command.cmd) { + case DemodulatorWorkerThreadCommand::DEMOD_WORKER_THREAD_CMD_BUILD_FILTERS: + filterChanged = true; + filterCommand = command; + break; + } + done = commandQueue->empty(); + } + + if (filterChanged) { + result.cmd = DemodulatorWorkerThreadResult::DEMOD_WORKER_THREAD_RESULT_FILTERS; + + result.resample_ratio = (float) (filterCommand.bandwidth) / (float) filterCommand.inputRate; + result.audio_resample_ratio = (float) (filterCommand.audioSampleRate) / (float) filterCommand.bandwidth; + + float fc = 0.5 * ((double) filterCommand.bandwidth / (double) filterCommand.inputRate); // filter cutoff frequency + + if (fc <= 0) { + fc = 0; + } + + if (fc >= 0.5) { + fc = 0.5; + } + + float ft = 0.05f; // filter transition + float As = 60.0f; // stop-band attenuation [dB] + float mu = 0.0f; // fractional timing offset + + // estimate required filter length and generate filter + unsigned int h_len = estimate_req_filter_len(ft, As); + float h[h_len]; + liquid_firdes_kaiser(h_len, fc, As, mu, h); + + result.fir_filter = firfilt_crcf_create(h, h_len); + + result.resampler = msresamp_crcf_create(result.resample_ratio, As); + result.audio_resampler = msresamp_crcf_create(result.audio_resample_ratio, As); + // msresamp_crcf_print(audio_resampler); + + resultQueue->push(result); + } + + } + + std::cout << "Demodulator worker thread done." << std::endl; +} + +void DemodulatorWorkerThread::terminate() { + terminated = true; + DemodulatorWorkerThreadCommand inp; // push dummy to nudge queue + commandQueue->push(inp); +} diff --git a/src/demod/DemodulatorWorkerThread.h b/src/demod/DemodulatorWorkerThread.h new file mode 100644 index 0000000..9911972 --- /dev/null +++ b/src/demod/DemodulatorWorkerThread.h @@ -0,0 +1,100 @@ +#pragma once + +#include +#include +#include "wx/wxprec.h" + +#ifndef WX_PRECOMP +#include "wx/wx.h" +#endif + +#include "wx/thread.h" + +#include "liquid/liquid.h" +#include "AudioThread.h" +#include "ThreadQueue.h" +#include "CubicSDRDefs.h" + +class DemodulatorWorkerThreadResult { +public: + enum DemodulatorThreadResultEnum { + DEMOD_WORKER_THREAD_RESULT_NULL, DEMOD_WORKER_THREAD_RESULT_FILTERS + }; + + DemodulatorWorkerThreadResult() : + cmd(DEMOD_WORKER_THREAD_RESULT_NULL), audioSampleRate(0), bandwidth(0), inputRate(0), fir_filter(NULL), resampler(NULL), resample_ratio( + 0), audio_resampler(NULL), audio_resample_ratio(0) { + + } + + DemodulatorWorkerThreadResult(DemodulatorThreadResultEnum cmd) : + cmd(cmd), audioSampleRate(0), bandwidth(0), inputRate(0), fir_filter(NULL), resampler(NULL), resample_ratio(0), audio_resampler(NULL), audio_resample_ratio( + 0) { + + } + + firfilt_crcf fir_filter; + msresamp_crcf resampler; + float resample_ratio; + msresamp_crcf audio_resampler; + float audio_resample_ratio; + + unsigned int inputRate; + unsigned int bandwidth; + unsigned int audioSampleRate; + + DemodulatorThreadResultEnum cmd; +}; + +class DemodulatorWorkerThreadCommand { +public: + enum DemodulatorThreadCommandEnum { + DEMOD_WORKER_THREAD_CMD_NULL, DEMOD_WORKER_THREAD_CMD_BUILD_FILTERS + }; + + DemodulatorWorkerThreadCommand() : + cmd(DEMOD_WORKER_THREAD_CMD_NULL), frequency(0), inputRate(0), bandwidth(0), audioSampleRate(0) { + + } + + DemodulatorWorkerThreadCommand(DemodulatorThreadCommandEnum cmd) : + cmd(cmd), frequency(0), inputRate(0), bandwidth(0), audioSampleRate(0) { + + } + + unsigned int frequency; + unsigned int inputRate; + unsigned int bandwidth; + unsigned int audioSampleRate; + + DemodulatorThreadCommandEnum cmd; +}; + +typedef ThreadQueue DemodulatorThreadWorkerCommandQueue; +typedef ThreadQueue DemodulatorThreadWorkerResultQueue; + +class DemodulatorWorkerThread { +public: + + DemodulatorWorkerThread(DemodulatorThreadWorkerCommandQueue* in, DemodulatorThreadWorkerResultQueue* out); + ~DemodulatorWorkerThread(); + + void threadMain(); + + void setCommandQueue(DemodulatorThreadWorkerCommandQueue *tQueue) { + commandQueue = tQueue; + } + + void setResultQueue(DemodulatorThreadWorkerResultQueue *tQueue) { + resultQueue = tQueue; + } + + void terminate(); + +protected: + + DemodulatorThreadWorkerCommandQueue *commandQueue; + DemodulatorThreadWorkerResultQueue *resultQueue; + + std::atomic terminated; +}; diff --git a/src/sdr/SDRThread.h b/src/sdr/SDRThread.h index bdec3cf..67e4b16 100644 --- a/src/sdr/SDRThread.h +++ b/src/sdr/SDRThread.h @@ -21,7 +21,7 @@ public: SDR_THREAD_CMD_TUNE }; - SDRThreadCommand() : cmd(cmd), int_value(SDR_THREAD_CMD_NULL) { + SDRThreadCommand() : cmd(SDR_THREAD_CMD_NULL), int_value(0) { } diff --git a/src/visual/WaterfallCanvas.cpp b/src/visual/WaterfallCanvas.cpp index bb4dc92..942da14 100644 --- a/src/visual/WaterfallCanvas.cpp +++ b/src/visual/WaterfallCanvas.cpp @@ -221,7 +221,7 @@ void WaterfallCanvas::mouseMoved(wxMouseEvent& event) { } DemodulatorThreadCommand command; - command.cmd = DemodulatorThreadCommand::SDR_THREAD_CMD_SET_BANDWIDTH; + command.cmd = DemodulatorThreadCommand::DEMOD_THREAD_CMD_SET_BANDWIDTH; activeDemodulatorBandwidth = activeDemodulatorBandwidth + bwDiff; if (activeDemodulatorBandwidth < 1000) { activeDemodulatorBandwidth = 1000; @@ -242,7 +242,7 @@ void WaterfallCanvas::mouseMoved(wxMouseEvent& event) { } DemodulatorThreadCommand command; - command.cmd = DemodulatorThreadCommand::SDR_THREAD_CMD_SET_FREQUENCY; + command.cmd = DemodulatorThreadCommand::DEMOD_THREAD_CMD_SET_FREQUENCY; activeDemodulatorFrequency = activeDemodulatorFrequency + bwDiff; command.int_value = activeDemodulatorFrequency; @@ -327,7 +327,7 @@ void WaterfallCanvas::mouseReleased(wxMouseEvent& event) { int freq = center_freq - (int) (0.5 * (float) SRATE) + (int) ((float) pos * (float) SRATE); DemodulatorThreadCommand command; - command.cmd = DemodulatorThreadCommand::SDR_THREAD_CMD_SET_FREQUENCY; + command.cmd = DemodulatorThreadCommand::DEMOD_THREAD_CMD_SET_FREQUENCY; command.int_value = freq; demod->getCommandQueue()->push(command);