SDRPostThread cleanup/refactor

- Cache active demodulators and channel info instead of running the
whole list each time
This commit is contained in:
Charles J. Cliffe 2015-10-14 18:09:29 -04:00
parent edd154296c
commit d8c048fecc
2 changed files with 124 additions and 88 deletions

View File

@ -5,12 +5,21 @@
#include <vector>
#include <deque>
SDRPostThread::SDRPostThread() : IOThread(),
iqDataInQueue(NULL), iqDataOutQueue(NULL), iqVisualQueue(NULL), dcFilter(NULL){
SDRPostThread::SDRPostThread() : IOThread() {
iqDataInQueue = NULL;
iqDataOutQueue = NULL;
iqVisualQueue = NULL;
swapIQ.store(false);
numChannels = 0;
channelizer = NULL;
sampleRate = 0;
nRunDemods = 0;
doRefresh.store(false);
dcFilter = iirfilt_crcf_create_dc_blocker(0.0005);
}
SDRPostThread::~SDRPostThread() {
@ -19,6 +28,7 @@ SDRPostThread::~SDRPostThread() {
void SDRPostThread::bindDemodulator(DemodulatorInstance *demod) {
busy_demod.lock();
demodulators.push_back(demod);
doRefresh.store(true);
busy_demod.unlock();
}
@ -32,6 +42,7 @@ void SDRPostThread::removeDemodulator(DemodulatorInstance *demod) {
if (i != demodulators.end()) {
demodulators.erase(i);
doRefresh.store(true);
}
busy_demod.unlock();
}
@ -44,6 +55,81 @@ bool SDRPostThread::getSwapIQ() {
return this->swapIQ.load();
}
void SDRPostThread::initPFBChannelizer() {
// std::cout << "Initializing post-process FIR polyphase filterbank channelizer with " << numChannels << " channels." << std::endl;
if (channelizer) {
firpfbch2_crcf_destroy(channelizer);
}
channelizer = firpfbch2_crcf_create_kaiser(LIQUID_ANALYZER, numChannels, 1, 60);
chanBw = (sampleRate / numChannels) * 2;
chanCenters.resize(numChannels);
demodChannelActive.resize(numChannels);
// firpfbch2 returns 2x sample rate per channel
// so, max demodulation without gaps is 1/2 chanBw ..?
// std::cout << "Channel bandwidth spacing: " << (chanBw/2) << " actual bandwidth: " << chanBw << std::endl;
}
void SDRPostThread::updateActiveDemodulators() {
// In range?
std::vector<DemodulatorInstance *>::iterator demod_i;
nRunDemods = 0;
for (demod_i = demodulators.begin(); demod_i != demodulators.end(); demod_i++) {
DemodulatorInstance *demod = *demod_i;
DemodulatorThreadInputQueue *demodQueue = demod->getIQInputDataPipe();
// not in range?
if (abs(frequency - demod->getFrequency()) > (sampleRate / 2)) {
// deactivate if active
if (demod->isActive() && !demod->isFollow() && !demod->isTracking()) {
demod->setActive(false);
DemodulatorThreadIQData *dummyDataOut = new DemodulatorThreadIQData;
dummyDataOut->frequency = frequency;
dummyDataOut->sampleRate = sampleRate;
demodQueue->push(dummyDataOut);
}
// follow if follow mode
if (demod->isFollow() && wxGetApp().getFrequency() != demod->getFrequency()) {
wxGetApp().setFrequency(demod->getFrequency());
demod->setFollow(false);
}
} else if (!demod->isActive()) { // in range, activate if not activated
demod->setActive(true);
if (wxGetApp().getDemodMgr().getLastActiveDemodulator() == NULL) {
wxGetApp().getDemodMgr().setActiveDemodulator(demod);
}
}
if (!demod->isActive()) {
continue;
}
// Add to the current run
if (nRunDemods == runDemods.size()) {
runDemods.push_back(demod);
demodChannel.push_back(-1);
} else {
runDemods[nRunDemods] = demod;
demodChannel[nRunDemods] = -1;
}
nRunDemods++;
}
}
void SDRPostThread::updateChannels() {
// calculate channel center frequencies, todo: cache
for (int i = 0; i < numChannels/2; i++) {
int ofs = ((chanBw/2) * i);
chanCenters[i] = frequency + ofs;
chanCenters[i+(numChannels/2)] = frequency - (sampleRate/2) + ofs;
}
}
void SDRPostThread::run() {
#ifdef __APPLE__
pthread_t tID = pthread_self(); // ID of this thread
@ -52,28 +138,14 @@ void SDRPostThread::run() {
pthread_setschedparam(tID, SCHED_FIFO, &prio);
#endif
dcFilter = iirfilt_crcf_create_dc_blocker(0.0005);
std::cout << "SDR post-processing thread started.." << std::endl;
iqDataInQueue = (SDRThreadIQDataQueue*)getInputQueue("IQDataInput");
iqDataOutQueue = (DemodulatorThreadInputQueue*)getOutputQueue("IQDataOutput");
iqVisualQueue = (DemodulatorThreadInputQueue*)getOutputQueue("IQVisualDataOutput");
ReBuffer<DemodulatorThreadIQData> buffers;
std::vector<liquid_float_complex> fpData;
std::vector<liquid_float_complex> dataOut;
iqDataInQueue->set_max_num_items(0);
std::vector<long long> chanCenters;
long long chanBw;
int nRunDemods = 0;
std::vector<DemodulatorInstance *> runDemods;
std::vector<int> demodChannel;
std::vector<int> demodChannelActive;
while (!terminated) {
SDRThreadIQData *data_in;
@ -84,20 +156,8 @@ void SDRPostThread::run() {
if (numChannels != data_in->numChannels || sampleRate != data_in->sampleRate) {
numChannels = data_in->numChannels;
sampleRate = data_in->sampleRate;
std::cout << "Initializing post-process FIR polyphase filterbank channelizer with " << numChannels << " channels." << std::endl;
if (channelizer) {
firpfbch2_crcf_destroy(channelizer);
}
channelizer = firpfbch2_crcf_create_kaiser(LIQUID_ANALYZER, numChannels, 1, 60);
chanBw = (data_in->sampleRate / numChannels) * 2;
chanCenters.resize(numChannels);
demodChannelActive.resize(numChannels);
// firpfbch2 returns 2x sample rate per channel
// so, max demodulation without gaps is 1/2 chanBw ..?
std::cout << "Channel bandwidth spacing: " << (chanBw/2) << " actual bandwidth: " << chanBw << std::endl;
initPFBChannelizer();
doRefresh.store(true);
}
int dataSize = data_in->data.size();
@ -157,62 +217,20 @@ void SDRPostThread::run() {
busy_demod.lock();
if (frequency != data_in->frequency) {
frequency = data_in->frequency;
doRefresh.store(true);
}
if (doRefresh.load()) {
updateActiveDemodulators();
updateChannels();
doRefresh.store(false);
}
// Find active demodulators
if (demodulators.size()) {
// In range?
std::vector<DemodulatorInstance *>::iterator demod_i;
nRunDemods = 0;
for (demod_i = demodulators.begin(); demod_i != demodulators.end(); demod_i++) {
DemodulatorInstance *demod = *demod_i;
DemodulatorThreadInputQueue *demodQueue = demod->getIQInputDataPipe();
// not in range?
if (abs(data_in->frequency - demod->getFrequency()) > (data_in->sampleRate / 2)) {
// deactivate if active
if (demod->isActive() && !demod->isFollow() && !demod->isTracking()) {
demod->setActive(false);
DemodulatorThreadIQData *dummyDataOut = new DemodulatorThreadIQData;
dummyDataOut->frequency = data_in->frequency;
dummyDataOut->sampleRate = data_in->sampleRate;
demodQueue->push(dummyDataOut);
}
// follow if follow mode
if (demod->isFollow() && wxGetApp().getFrequency() != demod->getFrequency()) {
wxGetApp().setFrequency(demod->getFrequency());
demod->setFollow(false);
}
} else if (!demod->isActive()) { // in range, activate if not activated
demod->setActive(true);
if (wxGetApp().getDemodMgr().getLastActiveDemodulator() == NULL) {
wxGetApp().getDemodMgr().setActiveDemodulator(demod);
}
}
if (!demod->isActive()) {
continue;
}
// Add to the current run
if (nRunDemods == runDemods.size()) {
runDemods.push_back(demod);
demodChannel.push_back(-1);
} else {
runDemods[nRunDemods] = demod;
demodChannel[nRunDemods] = -1;
}
nRunDemods++;
}
// calculate channel center frequencies, todo: cache
for (int i = 0; i < numChannels/2; i++) {
int ofs = ((chanBw/2) * i);
chanCenters[i] = data_in->frequency + ofs;
chanCenters[i+(numChannels/2)] = data_in->frequency - (data_in->sampleRate/2) + ofs;
}
if (nRunDemods) {
// channelize data
// firpfbch2 output rate is 2 x ( input rate / channels )
for (int i = 0, iMax = dataSize; i < iMax; i+=numChannels/2) {

View File

@ -31,7 +31,25 @@ protected:
iirfilt_crcf dcFilter;
std::atomic_bool swapIQ;
private:
void initPFBChannelizer();
void updateActiveDemodulators();
void updateChannels();
ReBuffer<DemodulatorThreadIQData> buffers;
std::vector<liquid_float_complex> fpData;
std::vector<liquid_float_complex> dataOut;
std::vector<long long> chanCenters;
long long chanBw;
int nRunDemods;
std::vector<DemodulatorInstance *> runDemods;
std::vector<int> demodChannel;
std::vector<int> demodChannelActive;
ReBuffer<DemodulatorThreadIQData> visualDataBuffers;
atomic_bool doRefresh;
int numChannels, sampleRate;
long long frequency;
firpfbch2_crcf channelizer;
};