mirror of
https://github.com/cjcliffe/CubicSDR.git
synced 2024-11-16 17:11:46 -05:00
SaopySDRThread simplified, (small) perf improvement: No longer use inpBuffer,
write to SDRThreadIQData->data directly.
This commit is contained in:
parent
21a6d2b5f6
commit
a541e55ff6
@ -6,6 +6,7 @@
|
|||||||
#include <vector>
|
#include <vector>
|
||||||
#include "CubicSDR.h"
|
#include "CubicSDR.h"
|
||||||
#include <string>
|
#include <string>
|
||||||
|
#include <algorithm>
|
||||||
#include <SoapySDR/Logger.h>
|
#include <SoapySDR/Logger.h>
|
||||||
|
|
||||||
|
|
||||||
@ -134,7 +135,7 @@ bool SDRThread::init() {
|
|||||||
if (!mtuElems.load()) {
|
if (!mtuElems.load()) {
|
||||||
mtuElems.store(numElems.load());
|
mtuElems.store(numElems.load());
|
||||||
}
|
}
|
||||||
inpBuffer.data.resize(numElems.load());
|
|
||||||
overflowBuffer.data.resize(mtuElems.load());
|
overflowBuffer.data.resize(mtuElems.load());
|
||||||
|
|
||||||
buffs[0] = malloc(mtuElems.load() * 4 * sizeof(float));
|
buffs[0] = malloc(mtuElems.load() * 4 * sizeof(float));
|
||||||
@ -178,6 +179,15 @@ void SDRThread::deinit() {
|
|||||||
free(buffs[0]);
|
free(buffs[0]);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void SDRThread::assureBufferMinSize(SDRThreadIQData * dataOut, size_t minSize) {
|
||||||
|
|
||||||
|
if (dataOut->data.size() < minSize) {
|
||||||
|
dataOut->data.resize(minSize);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
//Called in an infinite loop, read SaopySDR device to build
|
||||||
|
// a 'this.numElems' sized batch of samples (SDRThreadIQData) and push it into iqDataOutQueue.
|
||||||
void SDRThread::readStream(SDRThreadIQDataQueue* iqDataOutQueue) {
|
void SDRThread::readStream(SDRThreadIQDataQueue* iqDataOutQueue) {
|
||||||
int flags;
|
int flags;
|
||||||
long long timeNs;
|
long long timeNs;
|
||||||
@ -186,41 +196,58 @@ void SDRThread::readStream(SDRThreadIQDataQueue* iqDataOutQueue) {
|
|||||||
int nElems = numElems.load();
|
int nElems = numElems.load();
|
||||||
int mtElems = mtuElems.load();
|
int mtElems = mtuElems.load();
|
||||||
|
|
||||||
//If overflow occured on the previous readStream(), transfer it in inpBuffer now
|
//0. Retreive a new batch
|
||||||
|
SDRThreadIQData *dataOut = buffers.getBuffer();
|
||||||
|
|
||||||
|
//1.If overflow occured on the previous readStream(), transfer it in dataOut directly.
|
||||||
|
//Take care of the iq_swap option.
|
||||||
if (numOverflow > 0) {
|
if (numOverflow > 0) {
|
||||||
int n_overflow = numOverflow;
|
int n_overflow = std::min(numOverflow, nElems);
|
||||||
if (n_overflow > nElems) {
|
|
||||||
n_overflow = nElems;
|
assureBufferMinSize(dataOut, n_overflow);
|
||||||
}
|
|
||||||
memcpy(&inpBuffer.data[0], &overflowBuffer.data[0], n_overflow * sizeof(liquid_float_complex));
|
::memcpy(&dataOut->data[0], &overflowBuffer.data[0], n_overflow * sizeof(liquid_float_complex));
|
||||||
n_read = n_overflow;
|
n_read = n_overflow;
|
||||||
numOverflow -= n_overflow;
|
|
||||||
|
|
||||||
if (numOverflow) { // still some left..
|
numOverflow = std::min(0, numOverflow - n_overflow);
|
||||||
memmove(&overflowBuffer.data[0], &overflowBuffer.data[n_overflow], numOverflow * sizeof(liquid_float_complex));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
//attempt readStream() at most nElems, by mtElems-sized chunks, append inpBuffer.
|
// std::cout << "SDRThread::readStream() 1.1 overflowBuffer not empty, collect the remaining " << n_overflow << " samples in it..." << std::endl;
|
||||||
|
|
||||||
|
if (numOverflow > 0) { // still some left, shift the remaining samples to the begining..
|
||||||
|
::memmove(&overflowBuffer.data[0], &overflowBuffer.data[n_overflow], numOverflow * sizeof(liquid_float_complex));
|
||||||
|
|
||||||
|
std::cout << "SDRThread::readStream() 1.2 overflowBuffer still not empty, compact the remaining " << numOverflow << " samples in it..." << std::endl;
|
||||||
|
}
|
||||||
|
} //end if numOverflow > 0
|
||||||
|
|
||||||
|
//2. attempt readStream() at most nElems, by mtElems-sized chunks, append in dataOut->data directly.
|
||||||
while (n_read < nElems && !stopping) {
|
while (n_read < nElems && !stopping) {
|
||||||
int n_requested = nElems-n_read;
|
|
||||||
|
|
||||||
|
//Whatever the number of remaining samples needed to reach nElems, we always try to read a mtElems-size chunk,
|
||||||
|
//from which SoapySDR effectively returns n_stream_read.
|
||||||
int n_stream_read = device->readStream(stream, buffs, mtElems, flags, timeNs);
|
int n_stream_read = device->readStream(stream, buffs, mtElems, flags, timeNs);
|
||||||
|
|
||||||
//if the n_stream_read <= 0, bail out from reading.
|
//if the n_stream_read <= 0, bail out from reading.
|
||||||
if (n_stream_read == 0) {
|
if (n_stream_read == 0) {
|
||||||
std::cout << "SDRThread::readStream(): read blocking..." << std::endl;
|
std::cout << "SDRThread::readStream(): 2. SoapySDR read blocking..." << std::endl;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
else if (n_stream_read < 0) {
|
else if (n_stream_read < 0) {
|
||||||
std::cout << "SDRThread::readStream(): read failed with code: " << n_stream_read << std::endl;
|
std::cout << "SDRThread::readStream(): 2. SoapySDR read failed with code: " << n_stream_read << std::endl;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
//sucess read beyond nElems, with overflow
|
//sucess read beyond nElems, so with overflow:
|
||||||
if ((n_read + n_stream_read) > nElems) {
|
if ((n_read + n_stream_read) > nElems) {
|
||||||
|
|
||||||
//Copy at most n_requested CF32 into inpBuffer.data liquid_float_complex,
|
//n_requested is the exact number to reach nElems.
|
||||||
|
//note: setting n_requested = n_stream_read;
|
||||||
|
//lead to push the whole n_stream_read, so no overflow management occurs,
|
||||||
|
//so dataOut->data can exess nElems by at most mtElems.
|
||||||
|
//TODO: doesn't change CubicSDR behaviour, so why not ?
|
||||||
|
int n_requested = nElems-n_read;
|
||||||
|
|
||||||
|
//Copy at most n_requested CF32 into .data liquid_float_complex,
|
||||||
//starting at n_read position.
|
//starting at n_read position.
|
||||||
//inspired from SoapyRTLSDR code, this mysterious void** is indeed an array of CF32(real/imag) samples, indeed an array of
|
//inspired from SoapyRTLSDR code, this mysterious void** is indeed an array of CF32(real/imag) samples, indeed an array of
|
||||||
//float with the following layout [sample 1 real part , sample 1 imag part, sample 2 real part , sample 2 imag part,sample 3 real part , sample 3 imag part,...etc]
|
//float with the following layout [sample 1 real part , sample 1 imag part, sample 2 real part , sample 2 imag part,sample 3 real part , sample 3 imag part,...etc]
|
||||||
@ -228,28 +255,66 @@ void SDRThread::readStream(SDRThreadIQDataQueue* iqDataOutQueue) {
|
|||||||
//nor that the Re/Im layout of fields matches the float array order, assign liquid_float_complex field by field.
|
//nor that the Re/Im layout of fields matches the float array order, assign liquid_float_complex field by field.
|
||||||
float *pp = (float *)buffs[0];
|
float *pp = (float *)buffs[0];
|
||||||
|
|
||||||
for (int i = 0; i < n_requested; i++) {
|
assureBufferMinSize(dataOut, n_read + n_requested);
|
||||||
inpBuffer.data[n_read + i].real = pp[2 * i];
|
|
||||||
inpBuffer.data[n_read + i].imag = pp[2 * i + 1];
|
|
||||||
}
|
|
||||||
|
|
||||||
numOverflow = n_stream_read-n_requested;
|
if (iq_swap.load()) {
|
||||||
|
for (int i = 0; i < n_requested; i++) {
|
||||||
|
dataOut->data[n_read + i].imag = pp[2 * i];
|
||||||
|
dataOut->data[n_read + i].real = pp[2 * i + 1];
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
for (int i = 0; i < n_requested; i++) {
|
||||||
|
dataOut->data[n_read + i].real = pp[2 * i];
|
||||||
|
dataOut->data[n_read + i].imag = pp[2 * i + 1];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
//shift of n_requested samples, each one made of 2 floats...
|
//shift of n_requested samples, each one made of 2 floats...
|
||||||
pp += n_requested * 2;
|
pp += n_requested * 2;
|
||||||
|
|
||||||
|
//numNewOverflow are in exess, they have to be added in the existing overflowBuffer.
|
||||||
|
int numNewOverflow = n_stream_read - n_requested;
|
||||||
|
|
||||||
//so push the remainder samples to overflowBuffer:
|
//so push the remainder samples to overflowBuffer:
|
||||||
for (int i = 0; i < numOverflow; i++) {
|
if (numNewOverflow > 0) {
|
||||||
overflowBuffer.data[i].real = pp[2 * i];
|
// std::cout << "SDRThread::readStream(): 2. SoapySDR read make nElems overflow by " << numNewOverflow << " samples..." << std::endl;
|
||||||
overflowBuffer.data[i].imag = pp[2 * i + 1];
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
assureBufferMinSize(&overflowBuffer, numOverflow + numNewOverflow);
|
||||||
|
|
||||||
|
if (iq_swap.load()) {
|
||||||
|
|
||||||
|
for (int i = 0; i < numNewOverflow; i++) {
|
||||||
|
overflowBuffer.data[numOverflow + i].imag = pp[2 * i];
|
||||||
|
overflowBuffer.data[numOverflow + i].real = pp[2 * i + 1];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
for (int i = 0; i < numNewOverflow; i++) {
|
||||||
|
overflowBuffer.data[numOverflow + i].real = pp[2 * i];
|
||||||
|
overflowBuffer.data[numOverflow + i].imag = pp[2 * i + 1];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
numOverflow += numNewOverflow;
|
||||||
|
|
||||||
n_read += n_requested;
|
n_read += n_requested;
|
||||||
} else if (n_stream_read > 0) {
|
} else if (n_stream_read > 0) { // no overflow, read the whole n_stream_read.
|
||||||
|
|
||||||
float *pp = (float *)buffs[0];
|
float *pp = (float *)buffs[0];
|
||||||
|
|
||||||
|
assureBufferMinSize(dataOut, n_read + n_stream_read);
|
||||||
|
|
||||||
|
if (iq_swap.load()) {
|
||||||
for (int i = 0; i < n_stream_read; i++) {
|
for (int i = 0; i < n_stream_read; i++) {
|
||||||
inpBuffer.data[n_read + i].real = pp[2 * i];
|
dataOut->data[n_read + i].imag = pp[2 * i];
|
||||||
inpBuffer.data[n_read + i].imag = pp[2 * i + 1];
|
dataOut->data[n_read + i].real = pp[2 * i + 1];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
for (int i = 0; i < n_stream_read; i++) {
|
||||||
|
dataOut->data[n_read + i].real = pp[2 * i];
|
||||||
|
dataOut->data[n_read + i].imag = pp[2 * i + 1];
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
n_read += n_stream_read;
|
n_read += n_stream_read;
|
||||||
@ -258,18 +323,11 @@ void SDRThread::readStream(SDRThreadIQDataQueue* iqDataOutQueue) {
|
|||||||
}
|
}
|
||||||
} //end while
|
} //end while
|
||||||
|
|
||||||
|
//3. At that point, dataOut contains nElems (or less if a read has return an error), try to post in queue, else discard.
|
||||||
if (n_read > 0 && !stopping && !iqDataOutQueue->full()) {
|
if (n_read > 0 && !stopping && !iqDataOutQueue->full()) {
|
||||||
SDRThreadIQData *dataOut = buffers.getBuffer();
|
|
||||||
|
|
||||||
if (iq_swap.load()) {
|
//clamp result:
|
||||||
dataOut->data.resize(n_read);
|
dataOut->data.resize(n_read);
|
||||||
for (int i = 0; i < n_read; i++) {
|
|
||||||
dataOut->data[i].imag = inpBuffer.data[i].real;
|
|
||||||
dataOut->data[i].real = inpBuffer.data[i].imag;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
dataOut->data.assign(inpBuffer.data.begin(), inpBuffer.data.begin()+n_read);
|
|
||||||
}
|
|
||||||
|
|
||||||
dataOut->frequency = frequency.load();
|
dataOut->frequency = frequency.load();
|
||||||
dataOut->sampleRate = sampleRate.load();
|
dataOut->sampleRate = sampleRate.load();
|
||||||
@ -281,12 +339,16 @@ void SDRThread::readStream(SDRThreadIQDataQueue* iqDataOutQueue) {
|
|||||||
//finally the push didn't suceeded, recycle dataOut immediatly.
|
//finally the push didn't suceeded, recycle dataOut immediatly.
|
||||||
dataOut->setRefCount(0);
|
dataOut->setRefCount(0);
|
||||||
|
|
||||||
std::cout << "SDRThread::readStream(): iqDataOutQueue output queue is full, discard processing ! " << std::endl;
|
std::cout << "SDRThread::readStream(): 3.2 iqDataOutQueue output queue is full, discard processing of the batch..." << std::endl;
|
||||||
|
|
||||||
//saturation, let a chance to the other threads to consume the existing samples
|
//saturation, let a chance to the other threads to consume the existing samples
|
||||||
std::this_thread::yield();
|
std::this_thread::yield();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
else {
|
||||||
|
dataOut->setRefCount(0);
|
||||||
|
std::cout << "SDRThread::readStream(): 3.1 iqDataOutQueue output queue is full, discard processing of the batch..." << std::endl;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void SDRThread::readLoop() {
|
void SDRThread::readLoop() {
|
||||||
@ -350,7 +412,7 @@ void SDRThread::updateSettings() {
|
|||||||
if (!mtuElems.load()) {
|
if (!mtuElems.load()) {
|
||||||
mtuElems.store(numElems.load());
|
mtuElems.store(numElems.load());
|
||||||
}
|
}
|
||||||
inpBuffer.data.resize(numElems.load());
|
|
||||||
overflowBuffer.data.resize(mtuElems.load());
|
overflowBuffer.data.resize(mtuElems.load());
|
||||||
free(buffs[0]);
|
free(buffs[0]);
|
||||||
buffs[0] = malloc(mtuElems.load() * 4 * sizeof(float));
|
buffs[0] = malloc(mtuElems.load() * 4 * sizeof(float));
|
||||||
|
@ -15,6 +15,7 @@
|
|||||||
#include <SoapySDR/Registry.hpp>
|
#include <SoapySDR/Registry.hpp>
|
||||||
#include <SoapySDR/Device.hpp>
|
#include <SoapySDR/Device.hpp>
|
||||||
|
|
||||||
|
#include <stddef.h>
|
||||||
|
|
||||||
class SDRThreadIQData: public ReferenceCounter {
|
class SDRThreadIQData: public ReferenceCounter {
|
||||||
public:
|
public:
|
||||||
@ -99,7 +100,6 @@ protected:
|
|||||||
SoapySDR::Device *device;
|
SoapySDR::Device *device;
|
||||||
void *buffs[1];
|
void *buffs[1];
|
||||||
ReBuffer<SDRThreadIQData> buffers;
|
ReBuffer<SDRThreadIQData> buffers;
|
||||||
SDRThreadIQData inpBuffer;
|
|
||||||
SDRThreadIQData overflowBuffer;
|
SDRThreadIQData overflowBuffer;
|
||||||
int numOverflow;
|
int numOverflow;
|
||||||
std::atomic<DeviceConfig *> deviceConfig;
|
std::atomic<DeviceConfig *> deviceConfig;
|
||||||
@ -121,4 +121,7 @@ protected:
|
|||||||
std::map<std::string, bool> gainChanged;
|
std::map<std::string, bool> gainChanged;
|
||||||
|
|
||||||
SoapySDR::Kwargs streamArgs;
|
SoapySDR::Kwargs streamArgs;
|
||||||
|
|
||||||
|
private:
|
||||||
|
void assureBufferMinSize(SDRThreadIQData * dataOut, size_t minSize);
|
||||||
};
|
};
|
||||||
|
Loading…
Reference in New Issue
Block a user