Experimental demodulator preprocessor thread

Demodulator becoming a CPU hotspot, trying to break up the workload
between threads..
This commit is contained in:
Charles J. Cliffe 2014-12-16 18:27:02 -05:00
parent 76c68cc4fa
commit c7a167a1d0
7 changed files with 267 additions and 256 deletions

View File

@ -125,6 +125,7 @@ SET (cubicsdr_sources
src/AppFrame.cpp src/AppFrame.cpp
src/sdr/SDRThread.cpp src/sdr/SDRThread.cpp
src/sdr/SDRPostThread.cpp src/sdr/SDRPostThread.cpp
src/demod/DemodulatorPreThread.cpp
src/demod/DemodulatorThread.cpp src/demod/DemodulatorThread.cpp
src/demod/DemodulatorWorkerThread.cpp src/demod/DemodulatorWorkerThread.cpp
src/demod/DemodulatorInstance.cpp src/demod/DemodulatorInstance.cpp
@ -151,6 +152,7 @@ SET (cubicsdr_headers
src/AppFrame.h src/AppFrame.h
src/sdr/SDRThread.h src/sdr/SDRThread.h
src/sdr/SDRPostThread.h src/sdr/SDRPostThread.h
src/demod/DemodulatorPreThread.h
src/demod/DemodulatorThread.h src/demod/DemodulatorThread.h
src/demod/DemodulatorWorkerThread.h src/demod/DemodulatorWorkerThread.h
src/demod/DemodulatorInstance.h src/demod/DemodulatorInstance.h

View File

@ -1,6 +1,8 @@
#pragma once #pragma once
#include "ThreadQueue.h" #include "ThreadQueue.h"
#include "CubicSDRDefs.h"
#include "liquid/liquid.h"
enum DemodulatorType { enum DemodulatorType {
DEMOD_TYPE_NULL, DEMOD_TYPE_NULL,
@ -17,6 +19,7 @@ public:
DEMOD_THREAD_CMD_NULL, DEMOD_THREAD_CMD_NULL,
DEMOD_THREAD_CMD_SET_BANDWIDTH, DEMOD_THREAD_CMD_SET_BANDWIDTH,
DEMOD_THREAD_CMD_SET_FREQUENCY, DEMOD_THREAD_CMD_SET_FREQUENCY,
DEMOD_THREAD_CMD_DEMOD_PREPROCESS_TERMINATED,
DEMOD_THREAD_CMD_DEMOD_TERMINATED, DEMOD_THREAD_CMD_DEMOD_TERMINATED,
DEMOD_THREAD_CMD_AUDIO_TERMINATED DEMOD_THREAD_CMD_AUDIO_TERMINATED
}; };
@ -58,6 +61,22 @@ public:
} }
}; };
class DemodulatorThreadPostIQData {
public:
std::vector<liquid_float_complex> data;
float audio_resample_ratio;
msresamp_crcf audio_resampler;
DemodulatorThreadPostIQData(): audio_resample_ratio(0), audio_resampler(NULL) {
}
~DemodulatorThreadPostIQData() {
}
};
class DemodulatorThreadAudioData { class DemodulatorThreadAudioData {
public: public:
unsigned int frequency; unsigned int frequency;
@ -84,4 +103,26 @@ public:
}; };
typedef ThreadQueue<DemodulatorThreadIQData> DemodulatorThreadInputQueue; typedef ThreadQueue<DemodulatorThreadIQData> DemodulatorThreadInputQueue;
typedef ThreadQueue<DemodulatorThreadPostIQData> DemodulatorThreadPostInputQueue;
typedef ThreadQueue<DemodulatorThreadCommand> DemodulatorThreadCommandQueue; typedef ThreadQueue<DemodulatorThreadCommand> DemodulatorThreadCommandQueue;
class DemodulatorThreadParameters {
public:
unsigned int frequency;
unsigned int inputRate;
unsigned int bandwidth; // set equal to disable second stage re-sampling?
unsigned int audioSampleRate;
DemodulatorType demodType;
DemodulatorThreadParameters() :
frequency(0), inputRate(SRATE), bandwidth(200000), audioSampleRate(
AUDIO_FREQUENCY), demodType(DEMOD_TYPE_FM) {
}
~DemodulatorThreadParameters() {
}
};

View File

@ -1,17 +1,22 @@
#include "DemodulatorInstance.h" #include "DemodulatorInstance.h"
DemodulatorInstance::DemodulatorInstance() : DemodulatorInstance::DemodulatorInstance() :
t_Demod(NULL), t_Audio(NULL), threadQueueDemod(NULL), demodulatorThread(NULL), terminated(false), audioTerminated(false), demodTerminated( t_Demod(NULL), t_PreDemod(NULL), t_Audio(NULL), threadQueueDemod(NULL), demodulatorThread(NULL), terminated(false), audioTerminated(false), demodTerminated(
false) { false), preDemodTerminated(false) {
label = new std::string("Unnamed"); label = new std::string("Unnamed");
threadQueueDemod = new DemodulatorThreadInputQueue; threadQueueDemod = new DemodulatorThreadInputQueue;
threadQueuePostDemod = new DemodulatorThreadPostInputQueue;
threadQueueCommand = new DemodulatorThreadCommandQueue; threadQueueCommand = new DemodulatorThreadCommandQueue;
threadQueueNotify = new DemodulatorThreadCommandQueue; threadQueueNotify = new DemodulatorThreadCommandQueue;
demodulatorThread = new DemodulatorThread(threadQueueDemod, threadQueueNotify);
demodulatorThread->setCommandQueue(threadQueueCommand); demodulatorPreThread = new DemodulatorPreThread(threadQueueDemod, threadQueuePostDemod, threadQueueNotify);
demodulatorPreThread->setCommandQueue(threadQueueCommand);
demodulatorThread = new DemodulatorThread(threadQueuePostDemod, threadQueueNotify);
audioInputQueue = new AudioThreadInputQueue; audioInputQueue = new AudioThreadInputQueue;
audioThread = new AudioThread(audioInputQueue, threadQueueNotify); audioThread = new AudioThread(audioInputQueue, threadQueueNotify);
demodulatorThread->setAudioInputQueue(audioInputQueue); demodulatorThread->setAudioInputQueue(audioInputQueue);
} }
@ -34,6 +39,12 @@ void DemodulatorInstance::run() {
pthread_attr_t attr; pthread_attr_t attr;
size_t size; size_t size;
pthread_attr_init(&attr);
pthread_attr_setstacksize(&attr, 2048000);
pthread_attr_getstacksize(&attr, &size);
pthread_create(&t_PreDemod, &attr, &DemodulatorPreThread::pthread_helper, demodulatorPreThread);
pthread_attr_destroy(&attr);
pthread_attr_init(&attr); pthread_attr_init(&attr);
pthread_attr_setstacksize(&attr, 2048000); pthread_attr_setstacksize(&attr, 2048000);
pthread_attr_getstacksize(&attr, &size); pthread_attr_getstacksize(&attr, &size);
@ -43,6 +54,7 @@ void DemodulatorInstance::run() {
std::cout << "Initialized demodulator stack size of " << size << std::endl; std::cout << "Initialized demodulator stack size of " << size << std::endl;
#else #else
t_PreDemod = new std::thread(&DemodulatorPreThread::threadMain, demodulatorPreThread);
t_Demod = new std::thread(&DemodulatorThread::threadMain, demodulatorThread); t_Demod = new std::thread(&DemodulatorThread::threadMain, demodulatorThread);
#endif #endif
} }
@ -59,16 +71,14 @@ DemodulatorThreadCommandQueue *DemodulatorInstance::getCommandQueue() {
} }
DemodulatorThreadParameters &DemodulatorInstance::getParams() { DemodulatorThreadParameters &DemodulatorInstance::getParams() {
return demodulatorThread->getParams(); return demodulatorPreThread->getParams();
} }
void DemodulatorInstance::terminate() { void DemodulatorInstance::terminate() {
std::cout << "Terminating demodulator preprocessor thread.." << std::endl;
demodulatorPreThread->terminate();
std::cout << "Terminating demodulator thread.." << std::endl; std::cout << "Terminating demodulator thread.." << std::endl;
demodulatorThread->terminate(); demodulatorThread->terminate();
//#ifdef __APPLE__
// pthread_join(t_Demod,NULL);
//#else
//#endif
std::cout << "Terminating demodulator audio thread.." << std::endl; std::cout << "Terminating demodulator audio thread.." << std::endl;
audioThread->terminate(); audioThread->terminate();
} }
@ -85,3 +95,43 @@ void DemodulatorInstance::setLabel(std::string labelStr) {
label = newLabel; label = newLabel;
delete oldLabel; delete oldLabel;
} }
bool DemodulatorInstance::isTerminated() {
while (!threadQueueNotify->empty()) {
DemodulatorThreadCommand cmd;
threadQueueNotify->pop(cmd);
switch (cmd.cmd) {
case DemodulatorThreadCommand::DEMOD_THREAD_CMD_AUDIO_TERMINATED:
audioThread = NULL;
t_Audio->join();
audioTerminated = true;
break;
case DemodulatorThreadCommand::DEMOD_THREAD_CMD_DEMOD_TERMINATED:
demodulatorThread = NULL;
#ifdef __APPLE__
pthread_join(t_Demod, NULL);
#else
t_Demod->join();
#endif
demodTerminated = true;
break;
case DemodulatorThreadCommand::DEMOD_THREAD_CMD_DEMOD_PREPROCESS_TERMINATED:
demodulatorPreThread = NULL;
#ifdef __APPLE__
pthread_join(t_PreDemod, NULL);
#else
t_PreDemod->join();
#endif
preDemodTerminated = true;
break;
default:
break;
}
}
terminated = audioTerminated && demodTerminated && preDemodTerminated;
return terminated;
}

View File

@ -5,17 +5,22 @@
#include <thread> #include <thread>
#include "DemodulatorThread.h" #include "DemodulatorThread.h"
#include "DemodulatorPreThread.h"
class DemodulatorInstance { class DemodulatorInstance {
public: public:
DemodulatorThreadInputQueue* threadQueueDemod; DemodulatorThreadInputQueue* threadQueueDemod;
DemodulatorThreadPostInputQueue* threadQueuePostDemod;
DemodulatorThreadCommandQueue* threadQueueCommand; DemodulatorThreadCommandQueue* threadQueueCommand;
DemodulatorThreadCommandQueue* threadQueueNotify; DemodulatorThreadCommandQueue* threadQueueNotify;
DemodulatorPreThread *demodulatorPreThread;
DemodulatorThread *demodulatorThread; DemodulatorThread *demodulatorThread;
#ifdef __APPLE__ #ifdef __APPLE__
pthread_t t_PreDemod;
pthread_t t_Demod; pthread_t t_Demod;
#else #else
std::thread *t_PreDemod;
std::thread *t_Demod; std::thread *t_Demod;
#endif #endif
@ -44,6 +49,7 @@ private:
bool terminated; bool terminated;
bool demodTerminated; bool demodTerminated;
bool audioTerminated; bool audioTerminated;
bool preDemodTerminated;
}; };

View File

@ -5,7 +5,6 @@
#include <string> #include <string>
#include <sstream> #include <sstream>
DemodulatorMgr::DemodulatorMgr() : DemodulatorMgr::DemodulatorMgr() :
activeDemodulator(NULL), lastActiveDemodulator(NULL), activeVisualDemodulator(NULL) { activeDemodulator(NULL), lastActiveDemodulator(NULL), activeVisualDemodulator(NULL) {
@ -141,33 +140,3 @@ void DemodulatorMgr::garbageCollect() {
} }
} }
bool DemodulatorInstance::isTerminated() {
while (!threadQueueNotify->empty()) {
DemodulatorThreadCommand cmd;
threadQueueNotify->pop(cmd);
switch (cmd.cmd) {
case DemodulatorThreadCommand::DEMOD_THREAD_CMD_AUDIO_TERMINATED:
audioThread = NULL;
t_Audio->join();
audioTerminated = true;
break;
case DemodulatorThreadCommand::DEMOD_THREAD_CMD_DEMOD_TERMINATED:
demodulatorThread = NULL;
#ifdef __APPLE__
pthread_join(t_Demod,NULL);
#else
t_Demod->join();
#endif
demodTerminated = true;
break;
default:
break;
}
}
terminated = audioTerminated && demodTerminated;
return terminated;
}

View File

@ -1,4 +1,4 @@
#include "DemodulatorThread.h"
#include "CubicSDRDefs.h" #include "CubicSDRDefs.h"
#include <vector> #include <vector>
@ -6,8 +6,10 @@
#include <pthread.h> #include <pthread.h>
#endif #endif
DemodulatorThread::DemodulatorThread(DemodulatorThreadInputQueue* pQueue, DemodulatorThreadCommandQueue* threadQueueNotify) : #include "DemodulatorPreThread.h"
inputQueue(pQueue), visOutQueue(NULL), terminated(false), initialized(false), audio_resampler(NULL), resample_ratio(1), audio_resample_ratio(
DemodulatorPreThread::DemodulatorPreThread(DemodulatorThreadInputQueue* pQueueIn, DemodulatorThreadPostInputQueue* pQueueOut, DemodulatorThreadCommandQueue* threadQueueNotify) :
inputQueue(pQueueIn), postInputQueue(pQueueOut), terminated(false), initialized(false), audio_resampler(NULL), resample_ratio(1), audio_resample_ratio(
1), resampler(NULL), commandQueue(NULL), fir_filter(NULL), audioInputQueue(NULL), threadQueueNotify(threadQueueNotify) { 1), resampler(NULL), commandQueue(NULL), fir_filter(NULL), audioInputQueue(NULL), threadQueueNotify(threadQueueNotify) {
float kf = 0.5; // modulation factor float kf = 0.5; // modulation factor
@ -24,7 +26,7 @@ DemodulatorThread::DemodulatorThread(DemodulatorThreadInputQueue* pQueue, Demodu
t_Worker = new std::thread(&DemodulatorWorkerThread::threadMain, workerThread); t_Worker = new std::thread(&DemodulatorWorkerThread::threadMain, workerThread);
} }
void DemodulatorThread::initialize() { void DemodulatorPreThread::initialize() {
initialized = false; initialized = false;
resample_ratio = (float) (params.bandwidth) / (float) params.inputRate; resample_ratio = (float) (params.bandwidth) / (float) params.inputRate;
@ -74,26 +76,24 @@ void DemodulatorThread::initialize() {
last_params = params; last_params = params;
} }
DemodulatorThread::~DemodulatorThread() { DemodulatorPreThread::~DemodulatorPreThread() {
delete workerThread; delete workerThread;
delete workerQueue; delete workerQueue;
delete workerResults; delete workerResults;
} }
#ifdef __APPLE__ #ifdef __APPLE__
void *DemodulatorThread::threadMain() { void *DemodulatorPreThread::threadMain() {
#else #else
void DemodulatorThread::threadMain() { void DemodulatorPreThread::threadMain() {
#endif #endif
if (!initialized) { if (!initialized) {
initialize(); initialize();
} }
liquid_float_complex *in_buf = new liquid_float_complex[BUF_SIZE / 2];
liquid_float_complex *out_buf = new liquid_float_complex[BUF_SIZE / 2];
std::cout << "Demodulator thread started.." << std::endl; std::cout << "Demodulator preprocessor thread started.." << std::endl;
while (!terminated) { while (!terminated) {
DemodulatorThreadIQData inp; DemodulatorThreadIQData inp;
inputQueue->pop(inp); inputQueue->pop(inp);
@ -154,66 +154,47 @@ void DemodulatorThread::threadMain() {
std::vector<signed char> *data = &inp.data; std::vector<signed char> *data = &inp.data;
if (data->size()) { if (data->size()) {
liquid_float_complex *temp_buf; int bufSize = data->size() / 2;
for (int i = 0; i < BUF_SIZE / 2; i++) { liquid_float_complex in_buf_data[bufSize];
liquid_float_complex out_buf_data[bufSize];
liquid_float_complex *in_buf = in_buf_data;
liquid_float_complex *out_buf = out_buf_data;
liquid_float_complex *temp_buf = NULL;
for (int i = 0; i < bufSize; i++) {
in_buf[i].real = (float) (*data)[i * 2] / 127.0f; in_buf[i].real = (float) (*data)[i * 2] / 127.0f;
in_buf[i].imag = (float) (*data)[i * 2 + 1] / 127.0f; in_buf[i].imag = (float) (*data)[i * 2 + 1] / 127.0f;
} }
if (shift_freq != 0) { if (shift_freq != 0) {
if (shift_freq < 0) { if (shift_freq < 0) {
nco_crcf_mix_block_up(nco_shift, in_buf, out_buf, BUF_SIZE / 2); nco_crcf_mix_block_up(nco_shift, in_buf, out_buf, bufSize);
} else { } else {
nco_crcf_mix_block_down(nco_shift, in_buf, out_buf, BUF_SIZE / 2); nco_crcf_mix_block_down(nco_shift, in_buf, out_buf, bufSize);
} }
temp_buf = in_buf; temp_buf = in_buf;
in_buf = out_buf; in_buf = out_buf;
out_buf = temp_buf; out_buf = temp_buf;
} }
firfilt_crcf_execute_block(fir_filter, in_buf, BUF_SIZE / 2, out_buf); firfilt_crcf_execute_block(fir_filter, in_buf, bufSize, out_buf);
int out_size = ceil((float) (BUF_SIZE / 2) * resample_ratio); int out_size = ceil((float) (bufSize) * resample_ratio);
liquid_float_complex resampled_output[out_size]; DemodulatorThreadPostIQData resamp;
float demod_output[out_size];
resamp.audio_resample_ratio = audio_resample_ratio;
resamp.audio_resampler = audio_resampler;
resamp.data.resize(out_size);
unsigned int num_written; // number of values written to buffer unsigned int num_written; // number of values written to buffer
msresamp_crcf_execute(resampler, out_buf, (BUF_SIZE / 2), resampled_output, &num_written); msresamp_crcf_execute(resampler, out_buf, (bufSize), &resamp.data[0], &num_written);
freqdem_demodulate_block(fdem, resampled_output, num_written, demod_output); resamp.data.resize(num_written);
for (int i = 0; i < num_written; i++) { postInputQueue->push(resamp);
resampled_output[i].real = demod_output[i];
resampled_output[i].imag = 0;
}
int audio_out_size = ceil((float) (num_written) * audio_resample_ratio);
liquid_float_complex resampled_audio_output[audio_out_size];
unsigned int num_audio_written;
msresamp_crcf_execute(audio_resampler, resampled_output, num_written, resampled_audio_output, &num_audio_written);
std::vector<float> newBuffer;
newBuffer.resize(num_audio_written * 2);
for (int i = 0; i < num_audio_written; i++) {
liquid_float_complex y = resampled_audio_output[i];
newBuffer[i * 2] = y.real;
newBuffer[i * 2 + 1] = y.real;
}
AudioThreadInput ati;
ati.data = newBuffer;
if (audioInputQueue != NULL) {
audioInputQueue->push(ati);
}
if (visOutQueue != NULL) {
visOutQueue->push(ati);
}
} }
if (!workerResults->empty()) { if (!workerResults->empty()) {
@ -244,16 +225,13 @@ void DemodulatorThread::threadMain() {
} }
} }
delete in_buf; std::cout << "Demodulator preprocessor thread done." << std::endl;
delete out_buf; DemodulatorThreadCommand tCmd(DemodulatorThreadCommand::DEMOD_THREAD_CMD_DEMOD_PREPROCESS_TERMINATED);
std::cout << "Demodulator thread done." << std::endl;
DemodulatorThreadCommand tCmd(DemodulatorThreadCommand::DEMOD_THREAD_CMD_DEMOD_TERMINATED);
tCmd.context = this; tCmd.context = this;
threadQueueNotify->push(tCmd); threadQueueNotify->push(tCmd);
} }
void DemodulatorThread::terminate() { void DemodulatorPreThread::terminate() {
terminated = true; terminated = true;
DemodulatorThreadIQData inp; // push dummy to nudge queue DemodulatorThreadIQData inp; // push dummy to nudge queue
inputQueue->push(inp); inputQueue->push(inp);

View File

@ -2,46 +2,16 @@
#include <queue> #include <queue>
#include <vector> #include <vector>
#include "wx/wxprec.h"
#ifndef WX_PRECOMP
#include "wx/wx.h"
#endif
#include "wx/thread.h"
#include "liquid/liquid.h"
#include "CubicSDRDefs.h" #include "CubicSDRDefs.h"
#include "DemodulatorWorkerThread.h"
#include "DemodDefs.h" #include "DemodDefs.h"
#include "DemodulatorWorkerThread.h"
class DemodulatorThreadParameters { class DemodulatorPreThread {
public:
unsigned int frequency;
unsigned int inputRate;
unsigned int bandwidth; // set equal to disable second stage re-sampling?
unsigned int audioSampleRate;
DemodulatorType demodType;
DemodulatorThreadParameters() :
frequency(0), inputRate(SRATE), bandwidth(200000), audioSampleRate(
AUDIO_FREQUENCY), demodType(DEMOD_TYPE_FM) {
}
~DemodulatorThreadParameters() {
}
};
typedef ThreadQueue<AudioThreadInput> DemodulatorThreadOutputQueue;
class DemodulatorThread {
public: public:
DemodulatorThread(DemodulatorThreadInputQueue* pQueue, DemodulatorThreadCommandQueue* threadQueueNotify); DemodulatorPreThread(DemodulatorThreadInputQueue* pQueueIn, DemodulatorThreadPostInputQueue* pQueueOut, DemodulatorThreadCommandQueue* threadQueueNotify);
~DemodulatorThread(); ~DemodulatorPreThread();
#ifdef __APPLE__ #ifdef __APPLE__
void *threadMain(); void *threadMain();
@ -49,10 +19,6 @@ public:
void threadMain(); void threadMain();
#endif #endif
void setVisualOutputQueue(DemodulatorThreadOutputQueue *tQueue) {
visOutQueue = tQueue;
}
void setCommandQueue(DemodulatorThreadCommandQueue *tQueue) { void setCommandQueue(DemodulatorThreadCommandQueue *tQueue) {
commandQueue = tQueue; commandQueue = tQueue;
} }
@ -71,18 +37,17 @@ public:
#ifdef __APPLE__ #ifdef __APPLE__
static void *pthread_helper(void *context) { static void *pthread_helper(void *context) {
return ((DemodulatorThread *) context)->threadMain(); return ((DemodulatorPreThread *) context)->threadMain();
} }
#endif #endif
protected: protected:
DemodulatorThreadInputQueue* inputQueue; DemodulatorThreadInputQueue* inputQueue;
DemodulatorThreadOutputQueue* visOutQueue; DemodulatorThreadPostInputQueue* postInputQueue;
DemodulatorThreadCommandQueue* commandQueue; DemodulatorThreadCommandQueue* commandQueue;
AudioThreadInputQueue *audioInputQueue; AudioThreadInputQueue *audioInputQueue;
firfilt_crcf fir_filter; firfilt_crcf fir_filter;
msresamp_crcf resampler; msresamp_crcf resampler;
float resample_ratio; float resample_ratio;