From a541e55ff6f859b5da9c7e9d8ff19e89c5290383 Mon Sep 17 00:00:00 2001 From: vsonnier Date: Fri, 19 May 2017 20:01:07 +0200 Subject: [PATCH] SaopySDRThread simplified, (small) perf improvement: No longer use inpBuffer, write to SDRThreadIQData->data directly. --- src/sdr/SoapySDRThread.cpp | 158 ++++++++++++++++++++++++++----------- src/sdr/SoapySDRThread.h | 5 +- 2 files changed, 114 insertions(+), 49 deletions(-) diff --git a/src/sdr/SoapySDRThread.cpp b/src/sdr/SoapySDRThread.cpp index bc509e0..f5ef41e 100644 --- a/src/sdr/SoapySDRThread.cpp +++ b/src/sdr/SoapySDRThread.cpp @@ -6,6 +6,7 @@ #include #include "CubicSDR.h" #include +#include #include @@ -134,7 +135,7 @@ bool SDRThread::init() { if (!mtuElems.load()) { mtuElems.store(numElems.load()); } - inpBuffer.data.resize(numElems.load()); + overflowBuffer.data.resize(mtuElems.load()); buffs[0] = malloc(mtuElems.load() * 4 * sizeof(float)); @@ -178,6 +179,15 @@ void SDRThread::deinit() { free(buffs[0]); } +void SDRThread::assureBufferMinSize(SDRThreadIQData * dataOut, size_t minSize) { + + if (dataOut->data.size() < minSize) { + dataOut->data.resize(minSize); + } +} + +//Called in an infinite loop, read SaopySDR device to build +// a 'this.numElems' sized batch of samples (SDRThreadIQData) and push it into iqDataOutQueue. void SDRThread::readStream(SDRThreadIQDataQueue* iqDataOutQueue) { int flags; long long timeNs; @@ -186,41 +196,58 @@ void SDRThread::readStream(SDRThreadIQDataQueue* iqDataOutQueue) { int nElems = numElems.load(); int mtElems = mtuElems.load(); - //If overflow occured on the previous readStream(), transfer it in inpBuffer now + //0. Retreive a new batch + SDRThreadIQData *dataOut = buffers.getBuffer(); + + //1.If overflow occured on the previous readStream(), transfer it in dataOut directly. + //Take care of the iq_swap option. if (numOverflow > 0) { - int n_overflow = numOverflow; - if (n_overflow > nElems) { - n_overflow = nElems; - } - memcpy(&inpBuffer.data[0], &overflowBuffer.data[0], n_overflow * sizeof(liquid_float_complex)); + int n_overflow = std::min(numOverflow, nElems); + + assureBufferMinSize(dataOut, n_overflow); + + ::memcpy(&dataOut->data[0], &overflowBuffer.data[0], n_overflow * sizeof(liquid_float_complex)); n_read = n_overflow; - numOverflow -= n_overflow; + + numOverflow = std::min(0, numOverflow - n_overflow); + + // std::cout << "SDRThread::readStream() 1.1 overflowBuffer not empty, collect the remaining " << n_overflow << " samples in it..." << std::endl; - if (numOverflow) { // still some left.. - memmove(&overflowBuffer.data[0], &overflowBuffer.data[n_overflow], numOverflow * sizeof(liquid_float_complex)); + if (numOverflow > 0) { // still some left, shift the remaining samples to the begining.. + ::memmove(&overflowBuffer.data[0], &overflowBuffer.data[n_overflow], numOverflow * sizeof(liquid_float_complex)); + + std::cout << "SDRThread::readStream() 1.2 overflowBuffer still not empty, compact the remaining " << numOverflow << " samples in it..." << std::endl; } - } + } //end if numOverflow > 0 - //attempt readStream() at most nElems, by mtElems-sized chunks, append inpBuffer. + //2. attempt readStream() at most nElems, by mtElems-sized chunks, append in dataOut->data directly. while (n_read < nElems && !stopping) { - int n_requested = nElems-n_read; - + + //Whatever the number of remaining samples needed to reach nElems, we always try to read a mtElems-size chunk, + //from which SoapySDR effectively returns n_stream_read. int n_stream_read = device->readStream(stream, buffs, mtElems, flags, timeNs); //if the n_stream_read <= 0, bail out from reading. if (n_stream_read == 0) { - std::cout << "SDRThread::readStream(): read blocking..." << std::endl; + std::cout << "SDRThread::readStream(): 2. SoapySDR read blocking..." << std::endl; break; } else if (n_stream_read < 0) { - std::cout << "SDRThread::readStream(): read failed with code: " << n_stream_read << std::endl; + std::cout << "SDRThread::readStream(): 2. SoapySDR read failed with code: " << n_stream_read << std::endl; break; } - //sucess read beyond nElems, with overflow + //sucess read beyond nElems, so with overflow: if ((n_read + n_stream_read) > nElems) { - //Copy at most n_requested CF32 into inpBuffer.data liquid_float_complex, + //n_requested is the exact number to reach nElems. + //note: setting n_requested = n_stream_read; + //lead to push the whole n_stream_read, so no overflow management occurs, + //so dataOut->data can exess nElems by at most mtElems. + //TODO: doesn't change CubicSDR behaviour, so why not ? + int n_requested = nElems-n_read; + + //Copy at most n_requested CF32 into .data liquid_float_complex, //starting at n_read position. //inspired from SoapyRTLSDR code, this mysterious void** is indeed an array of CF32(real/imag) samples, indeed an array of //float with the following layout [sample 1 real part , sample 1 imag part, sample 2 real part , sample 2 imag part,sample 3 real part , sample 3 imag part,...etc] @@ -228,29 +255,67 @@ void SDRThread::readStream(SDRThreadIQDataQueue* iqDataOutQueue) { //nor that the Re/Im layout of fields matches the float array order, assign liquid_float_complex field by field. float *pp = (float *)buffs[0]; - for (int i = 0; i < n_requested; i++) { - inpBuffer.data[n_read + i].real = pp[2 * i]; - inpBuffer.data[n_read + i].imag = pp[2 * i + 1]; - } + assureBufferMinSize(dataOut, n_read + n_requested); - numOverflow = n_stream_read-n_requested; + if (iq_swap.load()) { + for (int i = 0; i < n_requested; i++) { + dataOut->data[n_read + i].imag = pp[2 * i]; + dataOut->data[n_read + i].real = pp[2 * i + 1]; + } + } else { + for (int i = 0; i < n_requested; i++) { + dataOut->data[n_read + i].real = pp[2 * i]; + dataOut->data[n_read + i].imag = pp[2 * i + 1]; + } + } - //shift of n_requested samples, each one made of 2 floats... + //shift of n_requested samples, each one made of 2 floats... pp += n_requested * 2; + + //numNewOverflow are in exess, they have to be added in the existing overflowBuffer. + int numNewOverflow = n_stream_read - n_requested; + //so push the remainder samples to overflowBuffer: - for (int i = 0; i < numOverflow; i++) { - overflowBuffer.data[i].real = pp[2 * i]; - overflowBuffer.data[i].imag = pp[2 * i + 1]; - } + if (numNewOverflow > 0) { + // std::cout << "SDRThread::readStream(): 2. SoapySDR read make nElems overflow by " << numNewOverflow << " samples..." << std::endl; + } + + assureBufferMinSize(&overflowBuffer, numOverflow + numNewOverflow); + + if (iq_swap.load()) { + + for (int i = 0; i < numNewOverflow; i++) { + overflowBuffer.data[numOverflow + i].imag = pp[2 * i]; + overflowBuffer.data[numOverflow + i].real = pp[2 * i + 1]; + } + } + else { + for (int i = 0; i < numNewOverflow; i++) { + overflowBuffer.data[numOverflow + i].real = pp[2 * i]; + overflowBuffer.data[numOverflow + i].imag = pp[2 * i + 1]; + } + } + numOverflow += numNewOverflow; + n_read += n_requested; - } else if (n_stream_read > 0) { - + } else if (n_stream_read > 0) { // no overflow, read the whole n_stream_read. + float *pp = (float *)buffs[0]; - for (int i = 0; i < n_stream_read; i++) { - inpBuffer.data[n_read + i].real = pp[2 * i]; - inpBuffer.data[n_read + i].imag = pp[2 * i + 1]; - } + assureBufferMinSize(dataOut, n_read + n_stream_read); + + if (iq_swap.load()) { + for (int i = 0; i < n_stream_read; i++) { + dataOut->data[n_read + i].imag = pp[2 * i]; + dataOut->data[n_read + i].real = pp[2 * i + 1]; + } + } + else { + for (int i = 0; i < n_stream_read; i++) { + dataOut->data[n_read + i].real = pp[2 * i]; + dataOut->data[n_read + i].imag = pp[2 * i + 1]; + } + } n_read += n_stream_read; } else { @@ -258,19 +323,12 @@ void SDRThread::readStream(SDRThreadIQDataQueue* iqDataOutQueue) { } } //end while + //3. At that point, dataOut contains nElems (or less if a read has return an error), try to post in queue, else discard. if (n_read > 0 && !stopping && !iqDataOutQueue->full()) { - SDRThreadIQData *dataOut = buffers.getBuffer(); - - if (iq_swap.load()) { - dataOut->data.resize(n_read); - for (int i = 0; i < n_read; i++) { - dataOut->data[i].imag = inpBuffer.data[i].real; - dataOut->data[i].real = inpBuffer.data[i].imag; - } - } else { - dataOut->data.assign(inpBuffer.data.begin(), inpBuffer.data.begin()+n_read); - } + //clamp result: + dataOut->data.resize(n_read); + dataOut->frequency = frequency.load(); dataOut->sampleRate = sampleRate.load(); dataOut->dcCorrected = hasHardwareDC.load(); @@ -281,12 +339,16 @@ void SDRThread::readStream(SDRThreadIQDataQueue* iqDataOutQueue) { //finally the push didn't suceeded, recycle dataOut immediatly. dataOut->setRefCount(0); - std::cout << "SDRThread::readStream(): iqDataOutQueue output queue is full, discard processing ! " << std::endl; + std::cout << "SDRThread::readStream(): 3.2 iqDataOutQueue output queue is full, discard processing of the batch..." << std::endl; //saturation, let a chance to the other threads to consume the existing samples std::this_thread::yield(); } - } + } + else { + dataOut->setRefCount(0); + std::cout << "SDRThread::readStream(): 3.1 iqDataOutQueue output queue is full, discard processing of the batch..." << std::endl; + } } void SDRThread::readLoop() { @@ -350,7 +412,7 @@ void SDRThread::updateSettings() { if (!mtuElems.load()) { mtuElems.store(numElems.load()); } - inpBuffer.data.resize(numElems.load()); + overflowBuffer.data.resize(mtuElems.load()); free(buffs[0]); buffs[0] = malloc(mtuElems.load() * 4 * sizeof(float)); diff --git a/src/sdr/SoapySDRThread.h b/src/sdr/SoapySDRThread.h index 2982ac1..e38077f 100644 --- a/src/sdr/SoapySDRThread.h +++ b/src/sdr/SoapySDRThread.h @@ -15,6 +15,7 @@ #include #include +#include class SDRThreadIQData: public ReferenceCounter { public: @@ -99,7 +100,6 @@ protected: SoapySDR::Device *device; void *buffs[1]; ReBuffer buffers; - SDRThreadIQData inpBuffer; SDRThreadIQData overflowBuffer; int numOverflow; std::atomic deviceConfig; @@ -121,4 +121,7 @@ protected: std::map gainChanged; SoapySDR::Kwargs streamArgs; + +private: + void assureBufferMinSize(SDRThreadIQData * dataOut, size_t minSize); };