COMMENTS,POLISHING: comments fenzy around VisualProcessor machinery,

make process() a true interface as strong hint for derived classes,
plus misc define added for understanding.

BUGFIX: FFTDataDistributor loses incoming samples when compacting internal buffers.

BUGFIX2: FFTDistributor: Frozen Waterfall if internal buffer is no bigger than fftSize
This commit is contained in:
vsonnier
2017-02-04 10:32:35 +01:00
parent 0815b6a684
commit 4609386648
13 changed files with 137 additions and 64 deletions
+48 -14
View File
@@ -2,13 +2,15 @@
// SPDX-License-Identifier: GPL-2.0+
#include "FFTDataDistributor.h"
#include <algorithm>
FFTDataDistributor::FFTDataDistributor() : outputBuffers("FFTDataDistributorBuffers"), fftSize(DEFAULT_FFT_SIZE), linesPerSecond(DEFAULT_WATERFALL_LPS), lineRateAccum(0.0) {
}
void FFTDataDistributor::setFFTSize(unsigned int fftSize) {
this->fftSize = fftSize;
void FFTDataDistributor::setFFTSize(unsigned int size) {
fftSize.store(size);
}
void FFTDataDistributor::setLinesPerSecond(unsigned int lines) {
@@ -29,25 +31,50 @@ void FFTDataDistributor::process() {
input->pop(inp);
if (inp) {
//Settings have changed, set new values and dump all previous samples stored in inputBuffer:
if (inputBuffer.sampleRate != inp->sampleRate || inputBuffer.frequency != inp->frequency) {
bufferMax = inp->sampleRate / 4;
//bufferMax must be at least fftSize (+ margin), else the waterfall get frozen, because no longer updated.
bufferMax = std::max((size_t)(inp->sampleRate * FFT_DISTRIBUTOR_BUFFER_IN_SECONDS), (size_t)(1.2 * fftSize.load()));
// std::cout << "Buffer Max: " << bufferMax << std::endl;
bufferOffset = 0;
bufferedItems = 0;
inputBuffer.sampleRate = inp->sampleRate;
inputBuffer.frequency = inp->frequency;
inputBuffer.data.resize(bufferMax);
}
//adjust (bufferMax ; inputBuffer.data) in case of FFT size change only.
if (bufferMax < (size_t)(1.2 * fftSize.load())) {
bufferMax = (size_t)(1.2 * fftSize.load());
inputBuffer.data.resize(bufferMax);
}
size_t nbSamplesToAdd = inp->data.size();
//No room left in inputBuffer.data to accept inp->data.size() more samples.
//so make room by sliding left of bufferOffset, which is fine because
//those samples has already been processed.
if ((bufferOffset + bufferedItems + inp->data.size()) > bufferMax) {
memmove(&inputBuffer.data[0], &inputBuffer.data[bufferOffset], bufferedItems*sizeof(liquid_float_complex));
bufferOffset = 0;
} else {
memcpy(&inputBuffer.data[bufferOffset+bufferedItems],&inp->data[0],inp->data.size()*sizeof(liquid_float_complex));
bufferedItems += inp->data.size();
//if there are too much samples, we may even overflow !
//as a fallback strategy, drop the last incomming new samples not fitting in inputBuffer.data.
if (bufferedItems + inp->data.size() > bufferMax) {
//clamp nbSamplesToAdd
nbSamplesToAdd = bufferMax - bufferedItems;
std::cout << "FFTDataDistributor::process() incoming samples overflow, dropping the last " << (inp->data.size() - nbSamplesToAdd) << " input samples..." << std::endl;
}
}
//store nbSamplesToAdd incoming samples.
memcpy(&inputBuffer.data[bufferOffset+bufferedItems],&inp->data[0], nbSamplesToAdd *sizeof(liquid_float_complex));
bufferedItems += nbSamplesToAdd;
//
inp->decRefCount();
} else {
//empty inp, wait for another.
continue;
}
@@ -56,12 +83,14 @@ void FFTDataDistributor::process() {
// number of lines in input
double inputLines = (double)bufferedItems / (double)fftSize;
// ratio required to achieve the desired rate
// ratio required to achieve the desired rate:
// it means we can achieive 'lineRateStep' times the target linesPerSecond.
// < 1 means we cannot reach it by lack of samples.
double lineRateStep = ((double)linesPerSecond * inputTime)/(double)inputLines;
//we have enough samples to FFT at least one 'line' of 'fftSize' frequencies for display:
if (bufferedItems >= fftSize) {
size_t numProcessed = 0;
if (lineRateAccum + (lineRateStep * ((double)bufferedItems/(double)fftSize)) < 1.0) {
// move along, nothing to see here..
lineRateAccum += (lineRateStep * ((double)bufferedItems/(double)fftSize));
@@ -74,10 +103,12 @@ void FFTDataDistributor::process() {
lineRateAccum += lineRateStep;
if (lineRateAccum >= 1.0) {
//each i represents a FFT computation
DemodulatorThreadIQData *outp = outputBuffers.getBuffer();
outp->frequency = inputBuffer.frequency;
outp->sampleRate = inputBuffer.sampleRate;
outp->data.assign(inputBuffer.data.begin()+bufferOffset+i,inputBuffer.data.begin()+bufferOffset+i+fftSize);
outp->data.assign(inputBuffer.data.begin()+bufferOffset+i,
inputBuffer.data.begin()+bufferOffset+i+ fftSize);
distribute(outp);
while (lineRateAccum >= 1.0) {
@@ -86,16 +117,19 @@ void FFTDataDistributor::process() {
}
numProcessed += fftSize;
}
} //end for
}
//advance bufferOffset read pointer,
//reduce size of bufferedItems.
if (numProcessed) {
bufferedItems -= numProcessed;
bufferOffset += numProcessed;
}
//clamp to zero the number of remaining items.
if (bufferedItems <= 0) {
bufferedItems = 0;
bufferOffset = 0;
}
}
}
} //end if bufferedItems >= fftSize
} //en while
}
+5 -3
View File
@@ -7,20 +7,22 @@
#include "DemodDefs.h"
#include <cmath>
#include <cstring>
#include <atomic>
class FFTDataDistributor : public VisualProcessor<DemodulatorThreadIQData, DemodulatorThreadIQData> {
public:
FFTDataDistributor();
void setFFTSize(unsigned int fftSize);
void setFFTSize(unsigned int size);
void setLinesPerSecond(unsigned int lines);
unsigned int getLinesPerSecond();
protected:
void process();
virtual void process();
DemodulatorThreadIQData inputBuffer, tempBuffer;
ReBuffer<DemodulatorThreadIQData> outputBuffers;
unsigned int fftSize;
std::atomic<unsigned int> fftSize;
unsigned int linesPerSecond;
double lineRateAccum;
size_t bufferMax = 0;
+16 -3
View File
@@ -30,10 +30,18 @@ void FFTVisualDataThread::run() {
DemodulatorThreadInputQueue *pipeIQDataIn = static_cast<DemodulatorThreadInputQueue *>(getInputQueue("IQDataInput"));
SpectrumVisualDataQueue *pipeFFTDataOut = static_cast<SpectrumVisualDataQueue *>(getOutputQueue("FFTDataOutput"));
fftQueue.set_max_num_items(100);
fftQueue.set_max_num_items(100);
pipeFFTDataOut->set_max_num_items(100);
//FFT distributor plumbing:
// IQDataInput push samples to process to FFT Data distributor.
fftDistrib.setInput(pipeIQDataIn);
//The FFT distributor has actually 1 output only, so it doesn't distribute at all :)
fftDistrib.attachOutput(&fftQueue);
//FFT Distributor output is ==> SpectrumVisualProcessor input.
wproc.setInput(&fftQueue);
wproc.attachOutput(pipeFFTDataOut);
wproc.setup(DEFAULT_FFT_SIZE);
@@ -42,7 +50,9 @@ void FFTVisualDataThread::run() {
while(!stopping) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
//this if fed by FFTDataDistributor which has a buffer of FFT_DISTRIBUTOR_BUFFER_IN_SECONDS
//so sleep for << FFT_DISTRIBUTOR_BUFFER_IN_SECONDS not to be overflown
std::this_thread::sleep_for(std::chrono::milliseconds((int)(FFT_DISTRIBUTOR_BUFFER_IN_SECONDS * 1000.0 / 25.0)));
// std::this_thread::yield();
int fftSize = wproc.getDesiredInputSize();
@@ -59,8 +69,11 @@ void FFTVisualDataThread::run() {
lpsChanged.store(false);
}
//Make FFT Distributor process IQ samples
//and package them into ready-to-FFT sample sets (representing 1 line) by wproc
fftDistrib.run();
// Make wproc do a FFT of each of the sample sets provided by fftDistrib:
while (!wproc.isInputEmpty()) {
wproc.run();
}
+1 -1
View File
@@ -11,7 +11,7 @@ ScopeVisualProcessor::ScopeVisualProcessor(): outputBuffers("ScopeVisualProcesso
fft_average_rate = 0.65f;
fft_ceil_ma = fft_ceil_maa = 0;
fft_floor_ma = fft_floor_maa = 0;
maxScopeSamples = 1024;
maxScopeSamples = DEFAULT_DMOD_FFT_SIZE;
fftPlan = nullptr;
}
+1 -1
View File
@@ -29,7 +29,7 @@ public:
void setScopeEnabled(bool scopeEnable);
void setSpectrumEnabled(bool spectrumEnable);
protected:
void process();
virtual void process();
ReBuffer<ScopeRenderData> outputBuffers;
std::atomic_bool scopeEnabled;
+4 -3
View File
@@ -16,11 +16,12 @@ SpectrumVisualProcessor *SpectrumVisualDataThread::getProcessor() {
}
void SpectrumVisualDataThread::run() {
// std::cout << "Spectrum visual data thread started." << std::endl;
while(!stopping) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
// std::this_thread::yield();
//this if fed by FFTDataDistributor which has a buffer of FFT_DISTRIBUTOR_BUFFER_IN_SECONDS
//so sleep for << FFT_DISTRIBUTOR_BUFFER_IN_SECONDS not to be overflown
std::this_thread::sleep_for(std::chrono::milliseconds((int)(FFT_DISTRIBUTOR_BUFFER_IN_SECONDS * 1000.0 / 25.0)));
sproc.run();
}
+1 -1
View File
@@ -53,7 +53,7 @@ public:
float getScaleFactor();
protected:
void process();
virtual void process();
ReBuffer<SpectrumVisualData> outputBuffers;
std::atomic_bool is_view;
+37 -19
View File
@@ -43,19 +43,22 @@ public:
return false;
}
//Set a (new) 'input' queue for incoming data.
void setInput(ThreadQueue<InputDataType *> *vis_in) {
std::lock_guard < std::recursive_mutex > busy_lock(busy_update);
input = vis_in;
}
//Add a vis_out queue where to consumed 'input' data will be
//dispatched by distribute().
void attachOutput(ThreadQueue<OutputDataType *> *vis_out) {
// attach an output queue
std::lock_guard < std::recursive_mutex > busy_lock(busy_update);
outputs.push_back(vis_out);
}
//reverse of attachOutput(), removed an existing attached vis_out.
void removeOutput(ThreadQueue<OutputDataType *> *vis_out) {
// remove an output queue
std::lock_guard < std::recursive_mutex > busy_lock(busy_update);
@@ -64,9 +67,9 @@ public:
if (i != outputs.end()) {
outputs.erase(i);
}
}
//Call process() repeateadly until all available 'input' data is consumed.
void run() {
std::lock_guard < std::recursive_mutex > busy_lock(busy_update);
@@ -78,37 +81,51 @@ public:
}
protected:
virtual void process() {
// process inputs to output
// distribute(output);
}
// derived class must implement a process() interface
//where typically 'input' data is consummed, procerssed, and then dispatched
//with distribute() to all 'outputs'.
virtual void process() = 0;
void distribute(OutputDataType *output) {
// distribute outputs
//To be used by derived classes implementing
//process() : will dispatch 'item' into as many
//available outputs, previously set by attachOutput().
void distribute(OutputDataType *item) {
std::lock_guard < std::recursive_mutex > busy_lock(busy_update);
output->setRefCount((int)outputs.size());
//We will try to distribute 'output' among all 'outputs',
//so 'output' will a-priori be shared among all 'outputs' so set its ref count to this
//amount.
item->setRefCount((int)outputs.size());
for (outputs_i = outputs.begin(); outputs_i != outputs.end(); outputs_i++) {
if (!(*outputs_i)->push(output)) {
output->decRefCount();
//if 'output' failed to be given to an outputs_i, dec its ref count accordingly.
if (!(*outputs_i)->push(item)) {
item->decRefCount();
}
}
// Now 'item' refcount matches the times 'item' has been successfully distributed,
//i.e shared among the outputs.
}
//the incoming data queue
ThreadQueue<InputDataType *> *input = nullptr;
//the n-outputs where to process()-ed data is distribute()-ed.
std::vector<ThreadQueue<OutputDataType *> *> outputs;
typename std::vector<ThreadQueue<OutputDataType *> *>::iterator outputs_i;
typename std::vector<ThreadQueue<OutputDataType *> *>::iterator outputs_i;
//protects input and outputs, must be recursive because ao reentrance
//protects input and outputs, must be recursive because of re-entrance
std::recursive_mutex busy_update;
};
//Specialization much like VisualDataReDistributor, except
//the input (pointer) is directly re-dispatched
//to outputs, so that all output indeed SHARE the same instance.
template<class OutputDataType = ReferenceCounter>
class VisualDataDistributor : public VisualProcessor<OutputDataType, OutputDataType> {
protected:
void process() {
virtual void process() {
OutputDataType *inp;
while (VisualProcessor<OutputDataType, OutputDataType>::input->try_pop(inp)) {
@@ -126,11 +143,12 @@ protected:
}
};
//specialization class which process() take an input item and re-dispatch
//A COPY to every outputs, without further processing. This is a 1-to-n dispatcher.
template<class OutputDataType = ReferenceCounter>
class VisualDataReDistributor : public VisualProcessor<OutputDataType, OutputDataType> {
protected:
void process() {
virtual void process() {
OutputDataType *inp;
while (VisualProcessor<OutputDataType, OutputDataType>::input->try_pop(inp)) {