diff --git a/src/AppFrame.cpp b/src/AppFrame.cpp index b861856..ab5b567 100644 --- a/src/AppFrame.cpp +++ b/src/AppFrame.cpp @@ -164,7 +164,7 @@ AppFrame::AppFrame() : #if CUBICSDR_ENABLE_VIEW_DEMOD wxBoxSizer *demodVisuals = new wxBoxSizer(wxVERTICAL); - wxGetApp().getDemodSpectrumProcessor()->setup(1024); + wxGetApp().getDemodSpectrumProcessor()->setup(DEFAULT_DMOD_FFT_SIZE); demodSpectrumCanvas = new SpectrumCanvas(demodPanel, attribList); demodSpectrumCanvas->setView(wxGetApp().getConfig()->getCenterFreq(), 300000); demodVisuals->Add(demodSpectrumCanvas, 3, wxEXPAND | wxALL, 0); @@ -173,7 +173,7 @@ AppFrame::AppFrame() : demodVisuals->AddSpacer(1); demodWaterfallCanvas = new WaterfallCanvas(demodPanel, attribList); - demodWaterfallCanvas->setup(1024, 128); + demodWaterfallCanvas->setup(DEFAULT_DMOD_FFT_SIZE, DEFAULT_DEMOD_WATERFALL_LINES_NB); demodWaterfallCanvas->setView(wxGetApp().getConfig()->getCenterFreq(), 300000); demodWaterfallCanvas->attachSpectrumCanvas(demodSpectrumCanvas); demodWaterfallCanvas->setMinBandwidth(8000); @@ -209,7 +209,7 @@ AppFrame::AppFrame() : scopeCanvas->setHelpTip("Audio Visuals, drag left/right to toggle Scope or Spectrum, 'B' to toggle decibels display."); scopeCanvas->SetMinSize(wxSize(128,-1)); demodScopeTray->Add(scopeCanvas, 8, wxEXPAND | wxALL, 0); - wxGetApp().getScopeProcessor()->setup(1024); + wxGetApp().getScopeProcessor()->setup(DEFAULT_SCOPE_FFT_SIZE); wxGetApp().getScopeProcessor()->attachOutput(scopeCanvas->getInputQueue()); demodScopeTray->AddSpacer(1); @@ -293,7 +293,7 @@ AppFrame::AppFrame() : wxPanel *spectrumPanel = new wxPanel(mainVisSplitter, wxID_ANY); wxBoxSizer *spectrumSizer = new wxBoxSizer(wxHORIZONTAL); - wxGetApp().getSpectrumProcessor()->setup(2048); + wxGetApp().getSpectrumProcessor()->setup(DEFAULT_FFT_SIZE); spectrumCanvas = new SpectrumCanvas(spectrumPanel, attribList); spectrumCanvas->setShowDb(true); spectrumCanvas->setUseDBOfs(true); @@ -336,7 +336,7 @@ AppFrame::AppFrame() : wxBoxSizer *wfSizer = new wxBoxSizer(wxHORIZONTAL); waterfallCanvas = new WaterfallCanvas(waterfallPanel, attribList); - waterfallCanvas->setup(2048, 512); + waterfallCanvas->setup(DEFAULT_FFT_SIZE, DEFAULT_MAIN_WATERFALL_LINES_NB); waterfallDataThread = new FFTVisualDataThread(); @@ -1025,17 +1025,6 @@ void AppFrame::OnMenu(wxCommandEvent& event) { lowPerfMode = lowPerfMenuItem->IsChecked(); wxGetApp().getConfig()->setLowPerfMode(lowPerfMode); -// long srate = wxGetApp().getSampleRate(); -// if (srate > CHANNELIZER_RATE_MAX && lowPerfMode) { -// if (wxGetApp().getSpectrumProcessor()->getFFTSize() != 1024) { -// setMainWaterfallFFTSize(1024); -// } -// } else if (srate > CHANNELIZER_RATE_MAX) { -// if (wxGetApp().getSpectrumProcessor()->getFFTSize() != 2048) { -// setMainWaterfallFFTSize(2048); -// } -// } - } else if (event.GetId() == wxID_SET_TIPS ) { if (wxGetApp().getConfig()->getShowTips()) { wxGetApp().getConfig()->setShowTips(false); diff --git a/src/CubicSDR.cpp b/src/CubicSDR.cpp index 18afff8..a2aa0e8 100644 --- a/src/CubicSDR.cpp +++ b/src/CubicSDR.cpp @@ -635,15 +635,15 @@ void CubicSDR::setSampleRate(long long rate_in) { setFrequency(frequency); if (rate_in <= CHANNELIZER_RATE_MAX / 8) { - appframe->setMainWaterfallFFTSize(512); + appframe->setMainWaterfallFFTSize(DEFAULT_FFT_SIZE / 4); appframe->getWaterfallDataThread()->getProcessor()->setHideDC(false); spectrumVisualThread->getProcessor()->setHideDC(false); } else if (rate_in <= CHANNELIZER_RATE_MAX) { - appframe->setMainWaterfallFFTSize(1024); + appframe->setMainWaterfallFFTSize(DEFAULT_FFT_SIZE / 2); appframe->getWaterfallDataThread()->getProcessor()->setHideDC(false); spectrumVisualThread->getProcessor()->setHideDC(false); } else if (rate_in > CHANNELIZER_RATE_MAX) { - appframe->setMainWaterfallFFTSize(2048); + appframe->setMainWaterfallFFTSize(DEFAULT_FFT_SIZE); appframe->getWaterfallDataThread()->getProcessor()->setHideDC(true); spectrumVisualThread->getProcessor()->setHideDC(true); } diff --git a/src/CubicSDRDefs.h b/src/CubicSDRDefs.h index 17980bd..17387eb 100644 --- a/src/CubicSDRDefs.h +++ b/src/CubicSDRDefs.h @@ -31,7 +31,15 @@ const char filePathSeparator = #define BUF_SIZE (16384*6) #define DEFAULT_SAMPLE_RATE 2500000 + +// #define DEFAULT_FFT_SIZE 2048 +#define DEFAULT_DMOD_FFT_SIZE (DEFAULT_FFT_SIZE / 2) +#define DEFAULT_SCOPE_FFT_SIZE (DEFAULT_FFT_SIZE / 2) + +//Both must be a power of 2 to prevent terrible OpenGL performance. +#define DEFAULT_MAIN_WATERFALL_LINES_NB 512 +#define DEFAULT_DEMOD_WATERFALL_LINES_NB 128 #define DEFAULT_DEMOD_TYPE "FM" #define DEFAULT_DEMOD_BW 200000 @@ -43,3 +51,5 @@ const char filePathSeparator = #define MANUAL_SAMPLE_RATE_MIN 2000000 // 2MHz #define MANUAL_SAMPLE_RATE_MAX 200000000 // 200MHz (We are 2017+ after all) +//Represents the amount of time to process in the FFT distributor. +#define FFT_DISTRIBUTOR_BUFFER_IN_SECONDS 0.250 diff --git a/src/demod/DemodDefs.h b/src/demod/DemodDefs.h index ac1b2f1..043ad41 100644 --- a/src/demod/DemodDefs.h +++ b/src/demod/DemodDefs.h @@ -59,6 +59,7 @@ class ModemKit; class DemodulatorThreadPostIQData: public ReferenceCounter { public: std::vector data; + long long sampleRate; std::string modemName; std::string modemType; diff --git a/src/panel/WaterfallPanel.cpp b/src/panel/WaterfallPanel.cpp index a3c225b..20b6fa1 100644 --- a/src/panel/WaterfallPanel.cpp +++ b/src/panel/WaterfallPanel.cpp @@ -39,6 +39,7 @@ void WaterfallPanel::refreshTheme() { void WaterfallPanel::setPoints(std::vector &points) { size_t halfPts = points.size()/2; if (halfPts == fft_size) { + for (unsigned int i = 0; i < fft_size; i++) { this->points[i] = points[i*2+1]; } @@ -102,6 +103,10 @@ void WaterfallPanel::update() { unsigned char *waterfall_tex; + //Creates 2x 2D textures into card memory. + //of size half_fft_size * waterfall_lines, which can be BIG. + //The limit of the size of Waterfall is the size of the maximum supported 2D texture + //by the graphic card. (half_fft_size * waterfall_lines, i.e DEFAULT_DEMOD_WATERFALL_LINES_NB * DEFAULT_FFT_SIZE/2) waterfall_tex = new unsigned char[half_fft_size * waterfall_lines]; memset(waterfall_tex, 0, half_fft_size * waterfall_lines); diff --git a/src/process/FFTDataDistributor.cpp b/src/process/FFTDataDistributor.cpp index 01da328..7a3c66d 100644 --- a/src/process/FFTDataDistributor.cpp +++ b/src/process/FFTDataDistributor.cpp @@ -2,13 +2,15 @@ // SPDX-License-Identifier: GPL-2.0+ #include "FFTDataDistributor.h" +#include FFTDataDistributor::FFTDataDistributor() : outputBuffers("FFTDataDistributorBuffers"), fftSize(DEFAULT_FFT_SIZE), linesPerSecond(DEFAULT_WATERFALL_LPS), lineRateAccum(0.0) { } -void FFTDataDistributor::setFFTSize(unsigned int fftSize) { - this->fftSize = fftSize; +void FFTDataDistributor::setFFTSize(unsigned int size) { + + fftSize.store(size); } void FFTDataDistributor::setLinesPerSecond(unsigned int lines) { @@ -29,25 +31,50 @@ void FFTDataDistributor::process() { input->pop(inp); if (inp) { + //Settings have changed, set new values and dump all previous samples stored in inputBuffer: if (inputBuffer.sampleRate != inp->sampleRate || inputBuffer.frequency != inp->frequency) { - - bufferMax = inp->sampleRate / 4; + + //bufferMax must be at least fftSize (+ margin), else the waterfall get frozen, because no longer updated. + bufferMax = std::max((size_t)(inp->sampleRate * FFT_DISTRIBUTOR_BUFFER_IN_SECONDS), (size_t)(1.2 * fftSize.load())); + // std::cout << "Buffer Max: " << bufferMax << std::endl; bufferOffset = 0; - + bufferedItems = 0; inputBuffer.sampleRate = inp->sampleRate; inputBuffer.frequency = inp->frequency; inputBuffer.data.resize(bufferMax); } + + //adjust (bufferMax ; inputBuffer.data) in case of FFT size change only. + if (bufferMax < (size_t)(1.2 * fftSize.load())) { + bufferMax = (size_t)(1.2 * fftSize.load()); + inputBuffer.data.resize(bufferMax); + } + + size_t nbSamplesToAdd = inp->data.size(); + + //No room left in inputBuffer.data to accept inp->data.size() more samples. + //so make room by sliding left of bufferOffset, which is fine because + //those samples has already been processed. if ((bufferOffset + bufferedItems + inp->data.size()) > bufferMax) { memmove(&inputBuffer.data[0], &inputBuffer.data[bufferOffset], bufferedItems*sizeof(liquid_float_complex)); bufferOffset = 0; - } else { - memcpy(&inputBuffer.data[bufferOffset+bufferedItems],&inp->data[0],inp->data.size()*sizeof(liquid_float_complex)); - bufferedItems += inp->data.size(); + //if there are too much samples, we may even overflow ! + //as a fallback strategy, drop the last incomming new samples not fitting in inputBuffer.data. + if (bufferedItems + inp->data.size() > bufferMax) { + //clamp nbSamplesToAdd + nbSamplesToAdd = bufferMax - bufferedItems; + std::cout << "FFTDataDistributor::process() incoming samples overflow, dropping the last " << (inp->data.size() - nbSamplesToAdd) << " input samples..." << std::endl; + } } + + //store nbSamplesToAdd incoming samples. + memcpy(&inputBuffer.data[bufferOffset+bufferedItems],&inp->data[0], nbSamplesToAdd *sizeof(liquid_float_complex)); + bufferedItems += nbSamplesToAdd; + // inp->decRefCount(); } else { + //empty inp, wait for another. continue; } @@ -56,12 +83,14 @@ void FFTDataDistributor::process() { // number of lines in input double inputLines = (double)bufferedItems / (double)fftSize; - // ratio required to achieve the desired rate + // ratio required to achieve the desired rate: + // it means we can achieive 'lineRateStep' times the target linesPerSecond. + // < 1 means we cannot reach it by lack of samples. double lineRateStep = ((double)linesPerSecond * inputTime)/(double)inputLines; + //we have enough samples to FFT at least one 'line' of 'fftSize' frequencies for display: if (bufferedItems >= fftSize) { size_t numProcessed = 0; - if (lineRateAccum + (lineRateStep * ((double)bufferedItems/(double)fftSize)) < 1.0) { // move along, nothing to see here.. lineRateAccum += (lineRateStep * ((double)bufferedItems/(double)fftSize)); @@ -74,10 +103,12 @@ void FFTDataDistributor::process() { lineRateAccum += lineRateStep; if (lineRateAccum >= 1.0) { + //each i represents a FFT computation DemodulatorThreadIQData *outp = outputBuffers.getBuffer(); outp->frequency = inputBuffer.frequency; outp->sampleRate = inputBuffer.sampleRate; - outp->data.assign(inputBuffer.data.begin()+bufferOffset+i,inputBuffer.data.begin()+bufferOffset+i+fftSize); + outp->data.assign(inputBuffer.data.begin()+bufferOffset+i, + inputBuffer.data.begin()+bufferOffset+i+ fftSize); distribute(outp); while (lineRateAccum >= 1.0) { @@ -86,16 +117,19 @@ void FFTDataDistributor::process() { } numProcessed += fftSize; - } + } //end for } + //advance bufferOffset read pointer, + //reduce size of bufferedItems. if (numProcessed) { bufferedItems -= numProcessed; bufferOffset += numProcessed; } + //clamp to zero the number of remaining items. if (bufferedItems <= 0) { bufferedItems = 0; bufferOffset = 0; } - } - } + } //end if bufferedItems >= fftSize + } //en while } diff --git a/src/process/FFTDataDistributor.h b/src/process/FFTDataDistributor.h index d924779..9f60288 100644 --- a/src/process/FFTDataDistributor.h +++ b/src/process/FFTDataDistributor.h @@ -7,20 +7,22 @@ #include "DemodDefs.h" #include #include +#include class FFTDataDistributor : public VisualProcessor { public: FFTDataDistributor(); - void setFFTSize(unsigned int fftSize); + void setFFTSize(unsigned int size); void setLinesPerSecond(unsigned int lines); unsigned int getLinesPerSecond(); protected: - void process(); + virtual void process(); DemodulatorThreadIQData inputBuffer, tempBuffer; ReBuffer outputBuffers; - unsigned int fftSize; + std::atomic fftSize; + unsigned int linesPerSecond; double lineRateAccum; size_t bufferMax = 0; diff --git a/src/process/FFTVisualDataThread.cpp b/src/process/FFTVisualDataThread.cpp index ddd3386..44e303d 100644 --- a/src/process/FFTVisualDataThread.cpp +++ b/src/process/FFTVisualDataThread.cpp @@ -30,10 +30,18 @@ void FFTVisualDataThread::run() { DemodulatorThreadInputQueue *pipeIQDataIn = static_cast(getInputQueue("IQDataInput")); SpectrumVisualDataQueue *pipeFFTDataOut = static_cast(getOutputQueue("FFTDataOutput")); - fftQueue.set_max_num_items(100); + + fftQueue.set_max_num_items(100); pipeFFTDataOut->set_max_num_items(100); + + //FFT distributor plumbing: + // IQDataInput push samples to process to FFT Data distributor. fftDistrib.setInput(pipeIQDataIn); + + //The FFT distributor has actually 1 output only, so it doesn't distribute at all :) fftDistrib.attachOutput(&fftQueue); + + //FFT Distributor output is ==> SpectrumVisualProcessor input. wproc.setInput(&fftQueue); wproc.attachOutput(pipeFFTDataOut); wproc.setup(DEFAULT_FFT_SIZE); @@ -42,7 +50,9 @@ void FFTVisualDataThread::run() { while(!stopping) { - std::this_thread::sleep_for(std::chrono::milliseconds(10)); + //this if fed by FFTDataDistributor which has a buffer of FFT_DISTRIBUTOR_BUFFER_IN_SECONDS + //so sleep for << FFT_DISTRIBUTOR_BUFFER_IN_SECONDS not to be overflown + std::this_thread::sleep_for(std::chrono::milliseconds((int)(FFT_DISTRIBUTOR_BUFFER_IN_SECONDS * 1000.0 / 25.0))); // std::this_thread::yield(); int fftSize = wproc.getDesiredInputSize(); @@ -59,8 +69,11 @@ void FFTVisualDataThread::run() { lpsChanged.store(false); } + //Make FFT Distributor process IQ samples + //and package them into ready-to-FFT sample sets (representing 1 line) by wproc fftDistrib.run(); - + + // Make wproc do a FFT of each of the sample sets provided by fftDistrib: while (!wproc.isInputEmpty()) { wproc.run(); } diff --git a/src/process/ScopeVisualProcessor.cpp b/src/process/ScopeVisualProcessor.cpp index 519f793..5b83b07 100644 --- a/src/process/ScopeVisualProcessor.cpp +++ b/src/process/ScopeVisualProcessor.cpp @@ -11,7 +11,7 @@ ScopeVisualProcessor::ScopeVisualProcessor(): outputBuffers("ScopeVisualProcesso fft_average_rate = 0.65f; fft_ceil_ma = fft_ceil_maa = 0; fft_floor_ma = fft_floor_maa = 0; - maxScopeSamples = 1024; + maxScopeSamples = DEFAULT_DMOD_FFT_SIZE; fftPlan = nullptr; } diff --git a/src/process/ScopeVisualProcessor.h b/src/process/ScopeVisualProcessor.h index c079911..de0732c 100644 --- a/src/process/ScopeVisualProcessor.h +++ b/src/process/ScopeVisualProcessor.h @@ -29,7 +29,7 @@ public: void setScopeEnabled(bool scopeEnable); void setSpectrumEnabled(bool spectrumEnable); protected: - void process(); + virtual void process(); ReBuffer outputBuffers; std::atomic_bool scopeEnabled; diff --git a/src/process/SpectrumVisualDataThread.cpp b/src/process/SpectrumVisualDataThread.cpp index bfe1600..d29796f 100644 --- a/src/process/SpectrumVisualDataThread.cpp +++ b/src/process/SpectrumVisualDataThread.cpp @@ -16,11 +16,12 @@ SpectrumVisualProcessor *SpectrumVisualDataThread::getProcessor() { } void SpectrumVisualDataThread::run() { -// std::cout << "Spectrum visual data thread started." << std::endl; while(!stopping) { - std::this_thread::sleep_for(std::chrono::milliseconds(10)); -// std::this_thread::yield(); + //this if fed by FFTDataDistributor which has a buffer of FFT_DISTRIBUTOR_BUFFER_IN_SECONDS + //so sleep for << FFT_DISTRIBUTOR_BUFFER_IN_SECONDS not to be overflown + std::this_thread::sleep_for(std::chrono::milliseconds((int)(FFT_DISTRIBUTOR_BUFFER_IN_SECONDS * 1000.0 / 25.0))); + sproc.run(); } diff --git a/src/process/SpectrumVisualProcessor.h b/src/process/SpectrumVisualProcessor.h index 0edd764..44467c2 100644 --- a/src/process/SpectrumVisualProcessor.h +++ b/src/process/SpectrumVisualProcessor.h @@ -53,7 +53,7 @@ public: float getScaleFactor(); protected: - void process(); + virtual void process(); ReBuffer outputBuffers; std::atomic_bool is_view; diff --git a/src/process/VisualProcessor.h b/src/process/VisualProcessor.h index 44310ac..bd41d40 100644 --- a/src/process/VisualProcessor.h +++ b/src/process/VisualProcessor.h @@ -43,19 +43,22 @@ public: return false; } + //Set a (new) 'input' queue for incoming data. void setInput(ThreadQueue *vis_in) { std::lock_guard < std::recursive_mutex > busy_lock(busy_update); input = vis_in; } + //Add a vis_out queue where to consumed 'input' data will be + //dispatched by distribute(). void attachOutput(ThreadQueue *vis_out) { // attach an output queue std::lock_guard < std::recursive_mutex > busy_lock(busy_update); outputs.push_back(vis_out); - } + //reverse of attachOutput(), removed an existing attached vis_out. void removeOutput(ThreadQueue *vis_out) { // remove an output queue std::lock_guard < std::recursive_mutex > busy_lock(busy_update); @@ -64,9 +67,9 @@ public: if (i != outputs.end()) { outputs.erase(i); } - } + //Call process() repeateadly until all available 'input' data is consumed. void run() { std::lock_guard < std::recursive_mutex > busy_lock(busy_update); @@ -78,37 +81,51 @@ public: } protected: - virtual void process() { - // process inputs to output - // distribute(output); - } + // derived class must implement a process() interface + //where typically 'input' data is consummed, procerssed, and then dispatched + //with distribute() to all 'outputs'. + virtual void process() = 0; - void distribute(OutputDataType *output) { - // distribute outputs + //To be used by derived classes implementing + //process() : will dispatch 'item' into as many + //available outputs, previously set by attachOutput(). + void distribute(OutputDataType *item) { + std::lock_guard < std::recursive_mutex > busy_lock(busy_update); - - output->setRefCount((int)outputs.size()); + //We will try to distribute 'output' among all 'outputs', + //so 'output' will a-priori be shared among all 'outputs' so set its ref count to this + //amount. + item->setRefCount((int)outputs.size()); for (outputs_i = outputs.begin(); outputs_i != outputs.end(); outputs_i++) { - - if (!(*outputs_i)->push(output)) { - output->decRefCount(); + //if 'output' failed to be given to an outputs_i, dec its ref count accordingly. + if (!(*outputs_i)->push(item)) { + item->decRefCount(); } } + + // Now 'item' refcount matches the times 'item' has been successfully distributed, + //i.e shared among the outputs. } + //the incoming data queue ThreadQueue *input = nullptr; + + //the n-outputs where to process()-ed data is distribute()-ed. std::vector *> outputs; - typename std::vector *>::iterator outputs_i; + + typename std::vector *>::iterator outputs_i; - //protects input and outputs, must be recursive because ao reentrance + //protects input and outputs, must be recursive because of re-entrance std::recursive_mutex busy_update; }; - +//Specialization much like VisualDataReDistributor, except +//the input (pointer) is directly re-dispatched +//to outputs, so that all output indeed SHARE the same instance. template class VisualDataDistributor : public VisualProcessor { protected: - void process() { + virtual void process() { OutputDataType *inp; while (VisualProcessor::input->try_pop(inp)) { @@ -126,11 +143,12 @@ protected: } }; - +//specialization class which process() take an input item and re-dispatch +//A COPY to every outputs, without further processing. This is a 1-to-n dispatcher. template class VisualDataReDistributor : public VisualProcessor { protected: - void process() { + virtual void process() { OutputDataType *inp; while (VisualProcessor::input->try_pop(inp)) {