SDRPostThread clean-up and re-factoring

This commit is contained in:
Charles J. Cliffe 2018-02-20 00:16:54 -05:00
parent afc29303c9
commit 8cb5e9e244
3 changed files with 266 additions and 381 deletions

View File

@ -2252,7 +2252,6 @@ void AppFrame::OnIdle(wxIdleEvent& event) {
} }
wproc->setView(waterfallCanvas->getViewState(), waterfallCanvas->getCenterFrequency(), waterfallCanvas->getBandwidth()); wproc->setView(waterfallCanvas->getViewState(), waterfallCanvas->getCenterFrequency(), waterfallCanvas->getBandwidth());
wxGetApp().getSDRPostThread()->setIQVisualRange(waterfallCanvas->getCenterFrequency(), waterfallCanvas->getBandwidth());
proc->setView(wproc->isView(), wproc->getCenterFrequency(), wproc->getBandwidth()); proc->setView(wproc->isView(), wproc->getCenterFrequency(), wproc->getBandwidth());

View File

@ -20,42 +20,28 @@ SDRPostThread::SDRPostThread() : IOThread(), buffers("SDRPostThreadBuffers"), vi
numChannels = 0; numChannels = 0;
channelizer = nullptr; channelizer = nullptr;
channelizer2 = nullptr; channelizer2 = nullptr;
// Channel mode default temporary for testing
chanMode = 2; chanMode = 2;
lastChanMode = 0; lastChanMode = 0;
sampleRate = 0; sampleRate = 0;
visFrequency.store(0);
visBandwidth.store(0);
doRefresh.store(false); doRefresh.store(false);
dcFilter = iirfilt_crcf_create_dc_blocker(0.0005f); dcFilter = iirfilt_crcf_create_dc_blocker(0.0005f);
} }
SDRPostThread::~SDRPostThread() { SDRPostThread::~SDRPostThread() {
} }
void SDRPostThread::notifyDemodulatorsChanged() { void SDRPostThread::notifyDemodulatorsChanged() {
doRefresh.store(true); doRefresh.store(true);
} }
void SDRPostThread::initPFBChannelizer() {
// std::cout << "Initializing post-process FIR polyphase filterbank channelizer with " << numChannels << " channels." << std::endl;
if (channelizer) {
firpfbch_crcf_destroy(channelizer);
}
channelizer = firpfbch_crcf_create_kaiser(LIQUID_ANALYZER, numChannels, 4, 60);
chanBw = (sampleRate / numChannels);
chanCenters.resize(numChannels+1);
demodChannelActive.resize(numChannels+1);
// std::cout << "Channel bandwidth spacing: " << (chanBw) << std::endl;
}
// Update the active list of demodulators for handling
void SDRPostThread::updateActiveDemodulators() { void SDRPostThread::updateActiveDemodulators() {
// In range? // In range?
@ -114,8 +100,8 @@ void SDRPostThread::updateActiveDemodulators() {
} }
} }
void SDRPostThread::resetAllDemodulators() {
void SDRPostThread::resetAllDemodulators() {
//retreive the current list of demodulators: //retreive the current list of demodulators:
auto demodulators = wxGetApp().getDemodMgr().getDemodulators(); auto demodulators = wxGetApp().getDemodMgr().getDemodulators();
@ -128,6 +114,8 @@ void SDRPostThread::resetAllDemodulators() {
doRefresh.store(true); doRefresh.store(true);
} }
// Update the channel positions and frequencies
void SDRPostThread::updateChannels() { void SDRPostThread::updateChannels() {
// calculate channel center frequencies, todo: cache // calculate channel center frequencies, todo: cache
for (int i = 0; i < numChannels/2; i++) { for (int i = 0; i < numChannels/2; i++) {
@ -138,6 +126,8 @@ void SDRPostThread::updateChannels() {
chanCenters[numChannels] = frequency + (sampleRate/2); chanCenters[numChannels] = frequency + (sampleRate/2);
} }
// Find the channelizer channel that corresponds to the given frequency
int SDRPostThread::getChannelAt(long long frequency) { int SDRPostThread::getChannelAt(long long frequency) {
int chan = -1; int chan = -1;
long long minDelta = sampleRate; long long minDelta = sampleRate;
@ -151,19 +141,17 @@ int SDRPostThread::getChannelAt(long long frequency) {
return chan; return chan;
} }
void SDRPostThread::setIQVisualRange(long long frequency, int bandwidth) {
visFrequency.store(frequency);
visBandwidth.store(bandwidth);
}
void SDRPostThread::setChannelizerType(SDRPostThreadChannelizerType chType) { void SDRPostThread::setChannelizerType(SDRPostThreadChannelizerType chType) {
chanMode.store((int)chType); chanMode.store((int)chType);
} }
SDRPostThreadChannelizerType SDRPostThread::getChannelizerType() { SDRPostThreadChannelizerType SDRPostThread::getChannelizerType() {
return (SDRPostThreadChannelizerType) chanMode.load(); return (SDRPostThreadChannelizerType) chanMode.load();
} }
void SDRPostThread::run() { void SDRPostThread::run() {
#ifdef __APPLE__ #ifdef __APPLE__
pthread_t tID = pthread_self(); // ID of this thread pthread_t tID = pthread_self(); // ID of this thread
@ -189,6 +177,9 @@ void SDRPostThread::run() {
bool doUpdate = false; bool doUpdate = false;
if (data_in && data_in->data.size()) { if (data_in && data_in->data.size()) {
pushVisualData(data_in.get());
if(data_in->numChannels > 1) { if(data_in->numChannels > 1) {
if (chanMode == 1) { if (chanMode == 1) {
runPFBCH(data_in.get()); runPFBCH(data_in.get());
@ -210,6 +201,7 @@ void SDRPostThread::run() {
//Only update the list of demodulators here //Only update the list of demodulators here
if (doUpdate || doRefresh.load()) { if (doUpdate || doRefresh.load()) {
updateActiveDemodulators(); updateActiveDemodulators();
doRefresh.store(false);
} }
} //end while } //end while
@ -231,110 +223,8 @@ void SDRPostThread::terminate() {
iqActiveDemodVisualQueue->flush(); iqActiveDemodVisualQueue->flush();
} }
void SDRPostThread::runSingleCH(SDRThreadIQData *data_in) { // Push visual data; i.e. Main Waterfall (all frames) and Spectrum (active frame)
if (sampleRate != data_in->sampleRate) { void SDRPostThread::pushVisualData(SDRThreadIQData *data_in) {
sampleRate = data_in->sampleRate;
numChannels = 1;
doRefresh.store(true);
}
size_t dataSize = data_in->data.size();
size_t outSize = data_in->data.size();
if (outSize > dataOut.capacity()) {
dataOut.reserve(outSize);
}
if (outSize != dataOut.size()) {
dataOut.resize(outSize);
}
if (frequency != data_in->frequency) {
frequency = data_in->frequency;
doRefresh.store(true);
}
if (doRefresh.load()) {
updateActiveDemodulators();
doRefresh.store(false);
}
size_t refCount = runDemods.size();
bool doIQDataOut = (iqDataOutQueue != nullptr && !iqDataOutQueue->full());
bool doDemodVisOut = (runDemods.size() > 0 && iqActiveDemodVisualQueue != nullptr && !iqActiveDemodVisualQueue->full());
bool doVisOut = (iqVisualQueue != nullptr && !iqVisualQueue->full());
if (doIQDataOut) {
refCount++;
}
if (doDemodVisOut) {
refCount++;
}
if (doVisOut) {
refCount++;
}
if (refCount) {
DemodulatorThreadIQDataPtr demodDataOut = buffers.getBuffer();
demodDataOut->frequency = frequency;
demodDataOut->sampleRate = sampleRate;
if (demodDataOut->data.size() != dataSize) {
if (demodDataOut->data.capacity() < dataSize) {
demodDataOut->data.reserve(dataSize);
}
demodDataOut->data.resize(dataSize);
}
iirfilt_crcf_execute_block(dcFilter, &data_in->data[0], dataSize, &demodDataOut->data[0]);
if (doDemodVisOut) {
//VSO: blocking push
iqActiveDemodVisualQueue->push(demodDataOut);
}
if (doIQDataOut) {
//VSO: blocking push
iqDataOutQueue->push(demodDataOut);
}
if (doVisOut) {
//VSO: blocking push
iqVisualQueue->push(demodDataOut);
}
for (size_t i = 0; i < runDemods.size(); i++) {
// try-push() : we do our best to only stimulate active demods, but some could happen to be dead, full, or indeed non-active.
//so in short never block here no matter what.
if (!runDemods[i]->getIQInputDataPipe()->try_push(demodDataOut)) {
// std::cout << "SDRPostThread::runSingleCH() attempt to push into demod '" << runDemods[i]->getLabel()
// << "' (" << runDemods[i]->getFrequency() << " Hz) failed, demod is either too busy, not-active, or dead..." << std::endl << std::flush;
std::this_thread::yield();
}
}
}
}
void SDRPostThread::runPFBCH(SDRThreadIQData *data_in) {
if (numChannels != data_in->numChannels || sampleRate != data_in->sampleRate || chanMode != lastChanMode) {
numChannels = data_in->numChannels;
sampleRate = data_in->sampleRate;
initPFBChannelizer();
lastChanMode = 1;
doRefresh.store(true);
}
size_t dataSize = data_in->data.size();
size_t outSize = data_in->data.size();
if (outSize > dataOut.capacity()) {
dataOut.reserve(outSize);
}
if (outSize != dataOut.size()) {
dataOut.resize(outSize);
}
if (iqDataOutQueue != nullptr && !iqDataOutQueue->full()) { if (iqDataOutQueue != nullptr && !iqDataOutQueue->full()) {
DemodulatorThreadIQDataPtr iqDataOut = visualDataBuffers.getBuffer(); DemodulatorThreadIQDataPtr iqDataOut = visualDataBuffers.getBuffer();
@ -346,7 +236,7 @@ void SDRPostThread::runPFBCH(SDRThreadIQData *data_in) {
iqDataOut->frequency = data_in->frequency; iqDataOut->frequency = data_in->frequency;
iqDataOut->sampleRate = data_in->sampleRate; iqDataOut->sampleRate = data_in->sampleRate;
iqDataOut->data.assign(data_in->data.begin(), data_in->data.begin() + dataSize); iqDataOut->data.assign(data_in->data.begin(), data_in->data.begin() + data_in->data.size());
//VSO: blocking push //VSO: blocking push
iqDataOutQueue->push(iqDataOut); iqDataOutQueue->push(iqDataOut);
@ -356,120 +246,231 @@ void SDRPostThread::runPFBCH(SDRThreadIQData *data_in) {
iqVisualQueue->push(iqDataOut); iqVisualQueue->push(iqDataOut);
} }
} }
}
if (frequency != data_in->frequency) { // Run without any processing; each demod gets the full SDR bandwidth to handle on it's own
void SDRPostThread::runSingleCH(SDRThreadIQData *data_in) {
bool refreshed = false;
if (sampleRate != data_in->sampleRate || doRefresh.load()) {
sampleRate = data_in->sampleRate;
numChannels = 1;
refreshed = true;
}
if (refreshed || frequency != data_in->frequency) {
frequency = data_in->frequency; frequency = data_in->frequency;
doRefresh.store(true);
}
if (doRefresh.load()) {
updateActiveDemodulators(); updateActiveDemodulators();
updateChannels();
doRefresh.store(false);
} }
DemodulatorInstancePtr activeDemod = wxGetApp().getDemodMgr().getLastActiveDemodulator(); size_t outSize = data_in->data.size();
int activeDemodChannel = -1;
// Find active demodulators if (outSize > dataOut.capacity()) {
if (runDemods.size() > 0) { dataOut.reserve(outSize);
}
if (outSize != dataOut.size()) {
dataOut.resize(outSize);
}
// channelize data DemodulatorThreadIQDataPtr demodDataOut = buffers.getBuffer();
// firpfbch output rate is (input rate / channels)
for (int i = 0, iMax = dataSize; i < iMax; i+=numChannels) { demodDataOut->frequency = frequency;
firpfbch_crcf_analyzer_execute(channelizer, &data_in->data[i], &dataOut[i]); demodDataOut->sampleRate = sampleRate;
if (demodDataOut->data.size() != outSize) {
if (demodDataOut->data.capacity() < outSize) {
demodDataOut->data.reserve(outSize);
} }
demodDataOut->data.resize(outSize);
}
for (int i = 0, iMax = numChannels+1; i < iMax; i++) { iirfilt_crcf_execute_block(dcFilter, &data_in->data[0], data_in->data.size(), &demodDataOut->data[0]);
demodChannelActive[i] = 0;
}
// Find nearest channel for each demodulator if (runDemods.size() > 0 && iqActiveDemodVisualQueue != nullptr && !iqActiveDemodVisualQueue->full()) {
for (size_t i = 0; i < runDemods.size(); i++) { //VSO: blocking push
DemodulatorInstancePtr demod = runDemods[i]; iqActiveDemodVisualQueue->push(demodDataOut);
demodChannel[i] = getChannelAt(demod->getFrequency()); }
if (demod == activeDemod) {
activeDemodChannel = demodChannel[i];
}
}
for (size_t i = 0; i < runDemods.size(); i++) { if (iqDataOutQueue != nullptr && !iqDataOutQueue->full()) {
// cache channel usage refcounts //VSO: blocking push
if (demodChannel[i] >= 0) { iqDataOutQueue->push(demodDataOut);
demodChannelActive[demodChannel[i]]++; }
}
}
// Run channels if (iqVisualQueue != nullptr && !iqVisualQueue->full()) {
for (int i = 0; i < numChannels+1; i++) { //VSO: blocking push
int doDemodVis = ((activeDemodChannel == i) && (iqActiveDemodVisualQueue != nullptr) && !iqActiveDemodVisualQueue->full())?1:0; iqVisualQueue->push(demodDataOut);
}
if (!doDemodVis && demodChannelActive[i] == 0) { for (size_t i = 0; i < runDemods.size(); i++) {
continue; // try-push() : we do our best to only stimulate active demods, but some could happen to be dead, full, or indeed non-active.
} //so in short never block here no matter what.
if (!runDemods[i]->getIQInputDataPipe()->try_push(demodDataOut)) {
DemodulatorThreadIQDataPtr demodDataOut = buffers.getBuffer(); // std::cout << "SDRPostThread::runSingleCH() attempt to push into demod '" << runDemods[i]->getLabel()
demodDataOut->frequency = chanCenters[i]; // << "' (" << runDemods[i]->getFrequency() << " Hz) failed, demod is either too busy, not-active, or dead..." << std::endl << std::flush;
demodDataOut->sampleRate = chanBw; std::this_thread::yield();
// Calculate channel buffer size
size_t chanDataSize = (outSize/numChannels);
if (demodDataOut->data.size() != chanDataSize) {
if (demodDataOut->data.capacity() < chanDataSize) {
demodDataOut->data.reserve(chanDataSize);
}
demodDataOut->data.resize(chanDataSize);
}
int idx = i;
// Extra channel wraps lower side band of lowest channel
// to fix frequency gap on upper side of spectrum
if (i == numChannels) {
idx = (numChannels/2);
}
// prepare channel data buffer
if (i == 0) { // Channel 0 requires DC correction
if (dcBuf.size() != chanDataSize) {
dcBuf.resize(chanDataSize);
}
for (size_t j = 0; j < chanDataSize; j++) {
dcBuf[j] = dataOut[idx];
idx += numChannels;
}
iirfilt_crcf_execute_block(dcFilter, &dcBuf[0], chanDataSize, &demodDataOut->data[0]);
} else {
for (size_t j = 0; j < chanDataSize; j++) {
demodDataOut->data[j] = dataOut[idx];
idx += numChannels;
}
}
if (doDemodVis) {
//VSO: blocking push
iqActiveDemodVisualQueue->push(demodDataOut);
}
for (size_t j = 0; j < runDemods.size(); j++) {
if (demodChannel[j] == i) {
// try-push() : we do our best to only stimulate active demods, but some could happen to be dead, full, or indeed non-active.
//so in short never block here no matter what.
if (!runDemods[j]->getIQInputDataPipe()->try_push(demodDataOut)) {
// std::cout << "SDRPostThread::runPFBCH() attempt to push into demod '" << runDemods[i]->getLabel()
// << "' (" << runDemods[i]->getFrequency() << " Hz) failed, demod is either too busy, not-active, or dead..." << std::endl << std::flush;
std::this_thread::yield();
}
}
} //end for
} }
} }
} }
void SDRPostThread::initPFBChannelizer2() { // Handle active channels, channel 0 offset correction, de-interlacing and push data to demodulators
void SDRPostThread::runDemodChannels(int channelBandwidth) {
DemodulatorInstancePtr activeDemod = wxGetApp().getDemodMgr().getLastActiveDemodulator();
// Calculate channel data size
size_t chanDataSize = dataOut.size()/numChannels;
// Channel for the 'active' demod that's displaying visual data
int activeDemodChannel = -1;
for (int i = 0, iMax = numChannels+1; i < iMax; i++) {
demodChannelActive[i] = 0;
}
// Find nearest channel for each demodulator
for (size_t i = 0; i < runDemods.size(); i++) {
DemodulatorInstancePtr demod = runDemods[i];
demodChannel[i] = getChannelAt(demod->getFrequency());
if (demod == activeDemod) {
activeDemodChannel = demodChannel[i];
}
}
// Count the demods per-channel
for (size_t i = 0; i < runDemods.size(); i++) {
if (demodChannel[i] >= 0) {
demodChannelActive[demodChannel[i]]++;
}
}
// Run channels
for (int i = 0; i < numChannels+1; i++) {
bool doDemodVis = (activeDemodChannel == i) && (iqActiveDemodVisualQueue != nullptr) && !iqActiveDemodVisualQueue->full();
if (!doDemodVis && demodChannelActive[i] == 0) {
// Nothing to do for this channel? continue.
continue;
}
// Get a channel buffer
DemodulatorThreadIQDataPtr demodDataOut = buffers.getBuffer();
demodDataOut->frequency = chanCenters[i];
demodDataOut->sampleRate = channelBandwidth;
// Resize and update capacity of buffer if necessary
if (demodDataOut->data.size() != chanDataSize) {
if (demodDataOut->data.capacity() < chanDataSize) {
demodDataOut->data.reserve(chanDataSize);
}
demodDataOut->data.resize(chanDataSize);
}
// Start copying interleaved data at given channel index
int idx = i;
// Extra channel wraps left side band of lowest channel
// to fix frequency gap on right side of spectrum
if (i == numChannels) {
idx = (numChannels/2);
}
// prepare channel data buffer
if (i == 0) { // Channel 0 requires DC correction
// Update DC Buffer size if needed
if (dcBuf.size() != chanDataSize) {
dcBuf.resize(chanDataSize);
}
// Copy interleaved channel data to dc buffer
for (size_t j = 0; j < chanDataSize; j++) {
dcBuf[j] = dataOut[idx];
idx += numChannels;
}
// Run DC Filter from dcBuf to demod output buffer
iirfilt_crcf_execute_block(dcFilter, &dcBuf[0], chanDataSize, &demodDataOut->data[0]);
} else {
// Copy interleaved channel data to demod output buffer
for (size_t j = 0; j < chanDataSize; j++) {
demodDataOut->data[j] = dataOut[idx];
idx += numChannels;
}
}
if (doDemodVis) {
//VSO: blocking push
iqActiveDemodVisualQueue->push(demodDataOut);
}
for (size_t j = 0; j < runDemods.size(); j++) {
if (demodChannel[j] == i) {
// try-push() : we do our best to only stimulate active demods, but some could happen to be dead, full, or indeed non-active.
//so in short never block here no matter what.
if (!runDemods[j]->getIQInputDataPipe()->try_push(demodDataOut)) {
// std::cout << "SDRPostThread::runPFBCH() attempt to push into demod '" << runDemods[i]->getLabel()
// << "' (" << runDemods[i]->getFrequency() << " Hz) failed, demod is either too busy, not-active, or dead..." << std::endl << std::flush;
std::this_thread::yield();
}
}
} //end for
}
}
void SDRPostThread::initPFBCH() {
// std::cout << "Initializing post-process FIR polyphase filterbank channelizer with " << numChannels << " channels." << std::endl;
if (channelizer) {
firpfbch_crcf_destroy(channelizer);
}
channelizer = firpfbch_crcf_create_kaiser(LIQUID_ANALYZER, numChannels, 4, 60);
chanBw = (sampleRate / numChannels);
chanCenters.resize(numChannels+1);
demodChannelActive.resize(numChannels+1);
// std::cout << "Channel bandwidth spacing: " << (chanBw) << std::endl;
}
void SDRPostThread::runPFBCH(SDRThreadIQData *data_in) {
bool refreshed = false;
if (numChannels != data_in->numChannels || sampleRate != data_in->sampleRate || chanMode != lastChanMode || doRefresh.load()) {
numChannels = data_in->numChannels;
sampleRate = data_in->sampleRate;
initPFBCH();
lastChanMode = 1;
refreshed = true;
}
if (refreshed || frequency != data_in->frequency) {
frequency = data_in->frequency;
updateActiveDemodulators();
updateChannels();
}
size_t outSize = data_in->data.size();
if (outSize > dataOut.capacity()) {
dataOut.reserve(outSize);
}
if (outSize != dataOut.size()) {
dataOut.resize(outSize);
}
// Find active demodulators
if (runDemods.size() > 0) {
// Channelize data
// firpfbch produces [numChannels] interleaved output samples for every [numChannels] samples
for (int i = 0, iMax = data_in->data.size(); i < iMax; i+=numChannels) {
firpfbch_crcf_analyzer_execute(channelizer, &data_in->data[i], &dataOut[i]);
}
runDemodChannels(chanBw);
}
}
void SDRPostThread::initPFBCH2() {
// std::cout << "Initializing post-process FIR polyphase filterbank channelizer with " << numChannels << " channels." << std::endl; // std::cout << "Initializing post-process FIR polyphase filterbank channelizer with " << numChannels << " channels." << std::endl;
if (channelizer2) { if (channelizer2) {
firpfbch2_crcf_destroy(channelizer2); firpfbch2_crcf_destroy(channelizer2);
@ -480,158 +481,42 @@ void SDRPostThread::initPFBChannelizer2() {
chanCenters.resize(numChannels+1); chanCenters.resize(numChannels+1);
demodChannelActive.resize(numChannels+1); demodChannelActive.resize(numChannels+1);
// std::cout << "Channel bandwidth spacing: " << (chanBw) << std::endl; // std::cout << "Channel bandwidth spacing: " << (chanBw) << std::endl;
} }
void SDRPostThread::runPFBCH2(SDRThreadIQData *data_in) { void SDRPostThread::runPFBCH2(SDRThreadIQData *data_in) {
if (numChannels != data_in->numChannels || sampleRate != data_in->sampleRate || chanMode != lastChanMode) { bool refreshed = false;
if (numChannels != data_in->numChannels || sampleRate != data_in->sampleRate || chanMode != lastChanMode || doRefresh.load()) {
numChannels = data_in->numChannels; numChannels = data_in->numChannels;
sampleRate = data_in->sampleRate; sampleRate = data_in->sampleRate;
initPFBChannelizer2(); initPFBCH2();
lastChanMode = 2; lastChanMode = 2;
doRefresh.store(true); refreshed = true;
} }
size_t dataSize = data_in->data.size(); if (refreshed || frequency != data_in->frequency) {
size_t outSize = data_in->data.size(); frequency = data_in->frequency;
updateActiveDemodulators();
updateChannels();
}
size_t outSize = data_in->data.size() * 2;
if (outSize > dataOut.capacity()) { if (outSize > dataOut.capacity()) {
dataOut.reserve(outSize); dataOut.reserve(outSize);
} }
if (outSize != dataOut.size() * 2) { if (outSize != dataOut.size()) {
dataOut.resize(outSize * 2); dataOut.resize(outSize);
} }
if (iqDataOutQueue != nullptr && !iqDataOutQueue->full()) {
DemodulatorThreadIQDataPtr iqDataOut = visualDataBuffers.getBuffer();
bool doVis = false;
if (iqVisualQueue != nullptr && !iqVisualQueue->full()) {
doVis = true;
}
iqDataOut->frequency = data_in->frequency;
iqDataOut->sampleRate = data_in->sampleRate;
iqDataOut->data.assign(data_in->data.begin(), data_in->data.begin() + dataSize);
//VSO: blocking push
iqDataOutQueue->push(iqDataOut);
if (doVis) {
//VSO: blocking push
iqVisualQueue->push(iqDataOut);
}
}
if (frequency != data_in->frequency) {
frequency = data_in->frequency;
doRefresh.store(true);
}
if (doRefresh.load()) {
updateActiveDemodulators();
updateChannels();
doRefresh.store(false);
}
DemodulatorInstancePtr activeDemod = wxGetApp().getDemodMgr().getLastActiveDemodulator();
int activeDemodChannel = -1;
// Find active demodulators // Find active demodulators
if (runDemods.size() > 0) { if (runDemods.size() > 0) {
// Channelize data
// channelize data // firpfbch2 produces [numChannels] interleaved output samples for every [numChannels/2] input samples
// firpfbch2 produces M output for every M/2 input for (int i = 0, iMax = data_in->data.size(); i < iMax; i += numChannels/2) {
for (int i = 0, iMax = dataSize; i < iMax; i += numChannels/2) {
firpfbch2_crcf_execute(channelizer2, &data_in->data[i], &dataOut[i*2]); firpfbch2_crcf_execute(channelizer2, &data_in->data[i], &dataOut[i*2]);
} }
for (int i = 0, iMax = numChannels+1; i < iMax; i++) { runDemodChannels(chanBw * 2);
demodChannelActive[i] = 0;
}
// Find nearest channel for each demodulator
for (size_t i = 0; i < runDemods.size(); i++) {
DemodulatorInstancePtr demod = runDemods[i];
demodChannel[i] = getChannelAt(demod->getFrequency());
if (demod == activeDemod) {
activeDemodChannel = demodChannel[i];
}
}
for (size_t i = 0; i < runDemods.size(); i++) {
// cache channel usage refcounts
if (demodChannel[i] >= 0) {
demodChannelActive[demodChannel[i]]++;
}
}
// Run channels
for (int i = 0; i < numChannels+1; i++) {
int doDemodVis = ((activeDemodChannel == i) && (iqActiveDemodVisualQueue != nullptr) && !iqActiveDemodVisualQueue->full())?1:0;
if (!doDemodVis && demodChannelActive[i] == 0) {
continue;
}
DemodulatorThreadIQDataPtr demodDataOut = buffers.getBuffer();
demodDataOut->frequency = chanCenters[i];
demodDataOut->sampleRate = chanBw * 2;
// Calculate channel buffer size
size_t chanDataSize = (outSize/numChannels);
if (demodDataOut->data.size() != chanDataSize * 2) {
if (demodDataOut->data.capacity() < chanDataSize * 2) {
demodDataOut->data.reserve(chanDataSize * 2);
}
demodDataOut->data.resize(chanDataSize * 2);
}
int idx = i;
// Extra channel wraps lower side band of lowest channel
// to fix frequency gap on upper side of spectrum
if (i == numChannels) {
idx = (numChannels/2);
}
// prepare channel data buffer
if (i == 0) { // Channel 0 requires DC correction
if (dcBuf.size() != chanDataSize * 2) {
dcBuf.resize(chanDataSize * 2);
}
for (size_t j = 0; j < chanDataSize*2; j++) {
dcBuf[j] = dataOut[idx];
idx += numChannels;
}
iirfilt_crcf_execute_block(dcFilter, &dcBuf[0], chanDataSize * 2, &demodDataOut->data[0]);
} else {
for (size_t j = 0; j < chanDataSize*2; j++) {
demodDataOut->data[j] = dataOut[idx];
idx += numChannels;
}
}
if (doDemodVis) {
//VSO: blocking push
iqActiveDemodVisualQueue->push(demodDataOut);
}
for (size_t j = 0; j < runDemods.size(); j++) {
if (demodChannel[j] == i) {
// try-push() : we do our best to only stimulate active demods, but some could happen to be dead, full, or indeed non-active.
//so in short never block here no matter what.
if (!runDemods[j]->getIQInputDataPipe()->try_push(demodDataOut)) {
// std::cout << "SDRPostThread::runPFBCH() attempt to push into demod '" << runDemods[i]->getLabel()
// << "' (" << runDemods[i]->getFrequency() << " Hz) failed, demod is either too busy, not-active, or dead..." << std::endl << std::flush;
std::this_thread::yield();
}
}
} //end for
}
} }
} }

View File

@ -7,8 +7,8 @@
#include <algorithm> #include <algorithm>
enum SDRPostThreadChannelizerType { enum SDRPostThreadChannelizerType {
SDRPostThreadCh = 1, SDRPostPFBCH = 1,
SDRPostThreadCh2 = 2 SDRPostPFBCH2 = 2
}; };
class SDRPostThread : public IOThread { class SDRPostThread : public IOThread {
@ -21,11 +21,8 @@ public:
virtual void run(); virtual void run();
virtual void terminate(); virtual void terminate();
void pushVisualData(SDRThreadIQData *data_in); void resetAllDemodulators();
void runSingleCH(SDRThreadIQData *data_in);
void runPFBCH(SDRThreadIQData *data_in);
void runPFBCH2(SDRThreadIQData *data_in);
void setIQVisualRange(long long frequency, int bandwidth);
void setChannelizerType(SDRPostThreadChannelizerType chType); void setChannelizerType(SDRPostThreadChannelizerType chType);
SDRPostThreadChannelizerType getChannelizerType(); SDRPostThreadChannelizerType getChannelizerType();
@ -38,16 +35,22 @@ protected:
private: private:
void initPFBChannelizer(); void pushVisualData(SDRThreadIQData *data_in);
void initPFBChannelizer2(); void runSingleCH(SDRThreadIQData *data_in);
void runDemodChannels(int channelBandwidth);
void initPFBCH();
void runPFBCH(SDRThreadIQData *data_in);
void initPFBCH2();
void runPFBCH2(SDRThreadIQData *data_in);
void updateActiveDemodulators(); void updateActiveDemodulators();
void updateChannels(); void updateChannels();
int getChannelAt(long long frequency); int getChannelAt(long long frequency);
void resetAllDemodulators();
ReBuffer<DemodulatorThreadIQData> buffers; ReBuffer<DemodulatorThreadIQData> buffers;
std::vector<liquid_float_complex> fpData;
std::vector<liquid_float_complex> dataOut; std::vector<liquid_float_complex> dataOut;
std::vector<long long> chanCenters; std::vector<long long> chanCenters;
long long chanBw = 0; long long chanBw = 0;
@ -58,8 +61,6 @@ private:
ReBuffer<DemodulatorThreadIQData> visualDataBuffers; ReBuffer<DemodulatorThreadIQData> visualDataBuffers;
atomic_bool doRefresh; atomic_bool doRefresh;
atomic_llong visFrequency;
atomic_int visBandwidth;
atomic_int chanMode; atomic_int chanMode;
int numChannels, sampleRate, lastChanMode; int numChannels, sampleRate, lastChanMode;