FIFO for multiple input handling: implementation

This commit is contained in:
f4exb 2019-09-25 18:39:17 +02:00
parent e3082d2ef2
commit 239c5974a7
9 changed files with 134 additions and 111 deletions

View File

@ -50,8 +50,7 @@ TestMI::TestMI(DeviceAPI *deviceAPI) :
m_masterTimer(deviceAPI->getMasterTimer())
{
m_mimoType = MIMOAsynchronous;
m_sampleSinkFifos.push_back(SampleSinkFifo(96000 * 4));
m_sampleSinkFifos.push_back(SampleSinkFifo(96000 * 4));
m_sampleMIFifo.init(2, 96000 * 4);
//m_sampleSinkVectors.resize(2);
m_deviceAPI->setNbSourceStreams(2);
m_networkManager = new QNetworkAccessManager();
@ -104,11 +103,11 @@ bool TestMI::start()
stop();
}
m_testSourceThreads.push_back(new TestMIThread(&m_sampleSinkFifos[0], 0));
m_testSourceThreads.push_back(new TestMIThread(&m_sampleMIFifo, 0));
m_testSourceThreads.back()->setSamplerate(m_settings.m_streams[0].m_sampleRate);
m_testSourceThreads.back()->startStop(true);
m_testSourceThreads.push_back(new TestMIThread(&m_sampleSinkFifos[1], 1));
m_testSourceThreads.push_back(new TestMIThread(&m_sampleMIFifo, 1));
m_testSourceThreads.back()->setSamplerate(m_settings.m_streams[1].m_sampleRate);
m_testSourceThreads.back()->startStop(true);

View File

@ -20,7 +20,7 @@
#include <stdio.h>
#include <errno.h>
#include "dsp/samplesinkfifo.h"
#include "dsp/samplemififo.h"
#include "testmithread.h"
@ -28,7 +28,7 @@
MESSAGE_CLASS_DEFINITION(TestMIThread::MsgStartStop, Message)
TestMIThread::TestMIThread(SampleSinkFifo* sampleFifo, int streamIndex, QObject* parent) :
TestMIThread::TestMIThread(SampleMIFifo* sampleFifo, int streamIndex, QObject* parent) :
QThread(parent),
m_running(false),
m_buf(0),
@ -386,7 +386,7 @@ void TestMIThread::callback(const qint16* buf, qint32 len)
break;
}
m_sampleFifo->write(m_convertBuffer.begin(), it);
m_sampleFifo->writeAsync(m_streamIndex, m_convertBuffer.begin(), it - m_convertBuffer.begin());
}
void TestMIThread::tick()

View File

@ -34,7 +34,7 @@
#define TESTMI_THROTTLE_MS 50
class SampleSinkFifo;
class SampleMIFifo;
class TestMIThread : public QThread {
Q_OBJECT
@ -59,7 +59,7 @@ public:
{ }
};
TestMIThread(SampleSinkFifo* sampleFifo, int streamIndex, QObject* parent = 0);
TestMIThread(SampleMIFifo* sampleFifo, int streamIndex, QObject* parent = nullptr);
~TestMIThread();
void startStop(bool start);
@ -90,7 +90,7 @@ private:
quint32 m_bufsize;
quint32 m_chunksize;
SampleVector m_convertBuffer;
SampleSinkFifo* m_sampleFifo;
SampleMIFifo* m_sampleFifo;
int m_streamIndex;
NCOF m_nco;
NCOF m_toneNco;

View File

@ -51,21 +51,3 @@ SampleSourceFifo* DeviceSampleMIMO::getSampleSourceFifo(unsigned int index)
return &m_sampleSourceFifos[index];
}
}
SampleSinkFifo* DeviceSampleMIMO::getSampleSinkFifo(unsigned int index)
{
if (index >= m_sampleSinkFifos.size()) {
return nullptr;
} else {
return &m_sampleSinkFifos[index];
}
}
SampleSinkVector* DeviceSampleMIMO::getSampleSinkVector(unsigned int index)
{
if (index >= m_sampleSinkVectors.size()) {
return nullptr;
} else {
return &m_sampleSinkVectors[index];
}
}

View File

@ -22,8 +22,7 @@
#include <vector>
#include "samplesourcefifo.h"
#include "samplesinkfifo.h"
#include "samplesinkvector.h"
#include "samplemififo.h"
#include "util/message.h"
#include "util/messagequeue.h"
#include "export.h"
@ -134,14 +133,13 @@ public:
MessageQueue *getMessageQueueToGUI() { return m_guiMessageQueue; }
unsigned int getNbSourceFifos() const { return m_sampleSourceFifos.size(); } //!< Get the number of Tx FIFOs
unsigned int getNbSinkFifos() const { return m_sampleSinkFifos.size(); } //!< Get the number of Rx FIFOs
unsigned int getNbSinkFifos() const { return m_sampleMIFifo.getNbStreams(); } //!< Get the number of Rx FIFOs
SampleSourceFifo* getSampleSourceFifo(unsigned int index); //!< Get Tx FIFO at index
SampleSinkFifo* getSampleSinkFifo(unsigned int index); //!< Get Rx FIFO at index
SampleSinkVector* getSampleSinkVector(unsigned int index); //!< Get Rx vector buffer at index (TODO: remove if not used)
SampleMIFifo* getSampleMIFifo() { return &m_sampleMIFifo; }
// Streams and FIFOs are in opposed source/sink type whick makes it confusing when stream direction is involved:
// Rx: source stream -> sink FIFO -> channel sinks
// Tx: sink stream <- source FIFO <- channel sources
unsigned int getNbSourceStreams() const { return m_sampleSinkFifos.size(); } //!< Commodity function same as getNbSinkFifos (Rx or source streams)
unsigned int getNbSourceStreams() const { return m_sampleMIFifo.getNbStreams(); } //!< Commodity function same as getNbSinkFifos (Rx or source streams)
unsigned int getNbSinkStreams() const { return m_sampleSourceFifos.size(); } //!< Commodity function same as getNbSourceFifos (Tx or sink streams)
protected slots:
@ -150,8 +148,7 @@ protected slots:
protected:
MIMOType m_mimoType;
std::vector<SampleSourceFifo> m_sampleSourceFifos; //!< Tx FIFOs
std::vector<SampleSinkFifo> m_sampleSinkFifos; //!< Rx FIFOs
std::vector<SampleSinkVector> m_sampleSinkVectors; //!< Rx vector buffer (TODO: remove if not used)
SampleMIFifo m_sampleMIFifo; //!< Multiple Input FIFO
MessageQueue m_inputMessageQueue; //!< Input queue to the sink
MessageQueue *m_guiMessageQueue; //!< Input message queue to the GUI
};

View File

@ -267,55 +267,71 @@ QString DSPDeviceMIMOEngine::deviceDescription()
return cmd.getDeviceDescription();
}
void DSPDeviceMIMOEngine::workSampleSinkVector(unsigned int sinkIndex)
void DSPDeviceMIMOEngine::workSampleSinkFifos()
{
SampleSinkVector* sampleFifo = m_deviceSampleMIMO->getSampleSinkVector(sinkIndex);
SampleMIFifo* sampleFifo = m_deviceSampleMIMO->getSampleMIFifo();
if (!sampleFifo) {
return;
}
SampleVector::iterator vbegin;
SampleVector::iterator vend;
sampleFifo->read(vbegin, vend);
std::vector<int> vPart1Begin;
std::vector<int> vPart1End;
std::vector<int> vPart2Begin;
std::vector<int> vPart2End;
std::vector<SampleVector> data = sampleFifo->getData();
workSamplePart(vbegin, vend, sinkIndex);
while (sampleFifo->dataAvailable())
{
sampleFifo->readSync(vPart1Begin, vPart1End, vPart2Begin, vPart2End);
for (unsigned int stream = 0; stream < data.size(); stream++)
{
SampleVector::const_iterator begin = data[stream].begin();
if (vPart1Begin[stream] != vPart1End[stream]) {
m_vectorBuffer.write(begin + vPart1Begin[stream], begin + vPart1End[stream], false);
}
if (vPart2Begin[stream] != vPart2End[stream]) {
m_vectorBuffer.write(begin + vPart2Begin[stream], begin + vPart2End[stream], false);
}
SampleVector::iterator vbegin, vend;
m_vectorBuffer.read(vbegin, vend);
workSamples(vbegin, vend, stream);
}
}
}
void DSPDeviceMIMOEngine::workSampleSinkFifo(unsigned int sinkIndex)
void DSPDeviceMIMOEngine::workSampleSinkFifo(unsigned int stream)
{
SampleSinkFifo* sampleFifo = m_deviceSampleMIMO->getSampleSinkFifo(sinkIndex);
int samplesDone = 0;
SampleMIFifo* sampleFifo = m_deviceSampleMIMO->getSampleMIFifo();
if (!sampleFifo) {
return;
}
while ((sampleFifo->fill() > 0) && (m_inputMessageQueue.size() == 0) && (samplesDone < m_deviceSampleMIMO->getSourceSampleRate(sinkIndex)))
{
SampleVector::iterator part1begin;
SampleVector::iterator part1end;
SampleVector::iterator part2begin;
SampleVector::iterator part2end;
SampleVector::const_iterator part1begin;
SampleVector::const_iterator part1end;
SampleVector::const_iterator part2begin;
SampleVector::const_iterator part2end;
std::size_t count = sampleFifo->readBegin(sampleFifo->fill(), &part1begin, &part1end, &part2begin, &part2end);
while (sampleFifo->dataAvailable(stream))
{
sampleFifo->readAsync(stream, part1begin, part1end, part2begin, part2end);
if (part1begin != part1end) { // first part of FIFO data
m_vectorBuffer.write(part1begin, part1end, false);
//workSamplePart(part1begin, part1end, sinkIndex);
}
if (part2begin != part2end) { // second part of FIFO data (used when block wraps around)
m_vectorBuffer.append(part2begin, part2end);
//workSamplePart(part2begin, part2end, sinkIndex);
}
SampleVector::iterator vbegin, vend;
m_vectorBuffer.read(vbegin, vend);
workSamplePart(vbegin, vend, sinkIndex);
sampleFifo->readCommit((unsigned int) count); // adjust FIFO pointers
samplesDone += count;
workSamples(vbegin, vend, stream);
}
}
@ -323,7 +339,7 @@ void DSPDeviceMIMOEngine::workSampleSinkFifo(unsigned int sinkIndex)
* Routes samples from device source FIFO to sink channels that are registered for the FIFO
* Routes samples from source channels registered for the FIFO to the device sink FIFO
*/
void DSPDeviceMIMOEngine::workSamplePart(const SampleVector::iterator& vbegin, const SampleVector::iterator& vend, unsigned int sinkIndex)
void DSPDeviceMIMOEngine::workSamples(const SampleVector::iterator& vbegin, const SampleVector::iterator& vend, unsigned int sinkIndex)
{
bool positiveOnly = false;
// DC and IQ corrections
@ -603,15 +619,12 @@ DSPDeviceMIMOEngine::State DSPDeviceMIMOEngine::gotoError(const QString& errorMe
void DSPDeviceMIMOEngine::handleDataRxSync()
{
if (m_state == StRunning)
{
for (unsigned int isink = 0; isink < m_deviceSampleMIMO->getNbSinkFifos(); isink++) {
workSampleSinkFifo(isink);
}
if (m_state == StRunning) {
workSampleSinkFifos();
}
}
void DSPDeviceMIMOEngine::handleDataRxAsync(unsigned int sinkIndex)
void DSPDeviceMIMOEngine::handleDataRxAsync(int sinkIndex)
{
if (m_state == StRunning) {
workSampleSinkFifo(sinkIndex);
@ -641,33 +654,34 @@ void DSPDeviceMIMOEngine::handleSetMIMO(DeviceSampleMIMO* mimo)
{
qDebug("DSPDeviceMIMOEngine::handleSetMIMO: synchronous sources set %s", qPrintable(mimo->getDeviceDescription()));
// connect(m_deviceSampleMIMO->getSampleSinkFifo(m_sampleSinkConnectionIndexes[0]), SIGNAL(dataReady()), this, SLOT(handleData()), Qt::QueuedConnection);
for (unsigned int isink = 0; isink < m_deviceSampleMIMO->getNbSinkFifos(); isink++)
{
qDebug("DSPDeviceMIMOEngine::handleSetMIMO: synchronous sources set %s channel %u",
qPrintable(mimo->getDeviceDescription()), isink);
QObject::connect(
m_deviceSampleMIMO->getSampleSinkFifo(isink),
&SampleSinkFifo::dataReady,
this,
[=](){ this->handleDataRxSync(); }, // lambda function is not strictly needed here
Qt::QueuedConnection
);
break; // use first available FIFO for sync signal
}
QObject::connect(
m_deviceSampleMIMO->getSampleMIFifo(),
&SampleMIFifo::dataSyncReady,
this,
&DSPDeviceMIMOEngine::handleDataRxSync,
Qt::QueuedConnection
);
}
else if (m_deviceSampleMIMO->getMIMOType() == DeviceSampleMIMO::MIMOAsynchronous) // asynchronous FIFOs
{
for (unsigned int isink = 0; isink < m_deviceSampleMIMO->getNbSinkFifos(); isink++)
for (unsigned int stream = 0; stream < m_deviceSampleMIMO->getNbSourceStreams(); stream++)
{
qDebug("DSPDeviceMIMOEngine::handleSetMIMO: asynchronous sources set %s channel %u",
qPrintable(mimo->getDeviceDescription()), isink);
qPrintable(mimo->getDeviceDescription()), stream);
QObject::connect(
m_deviceSampleMIMO->getSampleSinkFifo(isink),
&SampleSinkFifo::dataReady,
m_deviceSampleMIMO->getSampleMIFifo(),
&SampleMIFifo::dataAsyncReady,
this,
[=](){ this->handleDataRxAsync(isink); },
&DSPDeviceMIMOEngine::handleDataRxAsync,
Qt::QueuedConnection
);
// QObject::connect(
// m_deviceSampleMIMO->getSampleSinkFifo(stream),
// &SampleSinkFifo::dataReady,
// this,
// [=](){ this->handleDataRxAsync(stream); },
// Qt::QueuedConnection
// );
}
}

View File

@ -381,10 +381,9 @@ private:
SampleSinkVector m_vectorBuffer;
void run();
void workSampleSinkFifo(unsigned int sinkIndex); //!< transfer samples of one sink (asynchronously)
void workSampleSinkVector(unsigned int sinkIndex); //!< same but sample sink vector flavor (TODO: remove if unused)
void workSamplePart(const SampleVector::iterator& vbegin, const SampleVector::iterator& vend, unsigned int sinkIndex);
void workSamplePart(int count);
void workSampleSinkFifos(); //!< transfer samples of all sinks (sync mode)
void workSampleSinkFifo(unsigned int stream); //!< transfer samples of one sink (async mode)
void workSamples(const SampleVector::iterator& vbegin, const SampleVector::iterator& vend, unsigned int sinkIndex);
State gotoIdle(); //!< Go to the idle state
State gotoInit(); //!< Go to the acquisition init state from idle
@ -396,7 +395,7 @@ private:
private slots:
void handleDataRxSync(); //!< Handle data when Rx samples have to be processed synchronously
void handleDataRxAsync(unsigned int sinkIndex); //!< Handle data when Rx samples have to be processed asynchronously
void handleDataRxAsync(int streamIndex); //!< Handle data when Rx samples have to be processed asynchronously
void handleSynchronousMessages(); //!< Handle synchronous messages with the thread
void handleInputMessages(); //!< Handle input message queue
void handleForwardToSpectrumSink(int nbSamples);

View File

@ -78,9 +78,6 @@ void SampleMIFifo::writeSync(const std::vector<SampleVector::const_iterator>& vb
}
}
m_head = m_vHead[0];
m_fill = m_vFill[0];
emit dataSyncReady();
}
@ -138,34 +135,38 @@ void SampleMIFifo::readSync(
}
}
void SampleMIFifo::readSync(int& part1Begin, int& part1End, int& part2Begin, int& part2End)
void SampleMIFifo::readSync(
std::vector<int>& vPart1Begin, std::vector<int>& vPart1End,
std::vector<int>& vPart2Begin, std::vector<int>& vPart2End
)
{
if (m_data.size() == 0) {
return;
}
QMutexLocker mutexLocker(&m_mutex);
std::vector<SampleVector>::iterator dataIt = m_data.begin();
int stream = 0;
if (m_head < m_fill)
for (; dataIt != m_data.end(); ++dataIt, ++stream)
{
part1Begin = m_head;
part1End = m_fill;
part2Begin = 0;
part2End = 0;
}
else
{
part1Begin = m_head;
part2End = m_data[0].size();
part2Begin = 0;
part2End = m_fill;
}
if (m_vHead[stream] < m_vFill[stream])
{
vPart1Begin.push_back(m_vHead[stream]);
vPart1End.push_back(m_vFill[stream]);
vPart2Begin.push_back(0);
vPart2End.push_back(0);
}
else
{
vPart1Begin.push_back(m_vHead[stream]);
vPart1End.push_back(dataIt->size());
vPart2Begin.push_back(0);
vPart2End.push_back(m_vFill[stream]);
}
for (unsigned int stream = 0; stream < m_data.size(); stream++) {
m_vHead[stream] = m_vFill[stream];
}
m_head = m_fill;
}
void SampleMIFifo::readAsync(unsigned int stream,
@ -194,3 +195,30 @@ void SampleMIFifo::readAsync(unsigned int stream,
m_vHead[stream] = m_vFill[stream];
}
}
bool SampleMIFifo::dataAvailable()
{
if (m_data.size() == 0) {
return false;
}
QMutexLocker mutexLocker(&m_mutex);
bool value;
value = m_vHead[0] != m_vFill[0];
return value;
}
bool SampleMIFifo::dataAvailable(unsigned int stream)
{
QMutexLocker mutexLocker(&m_mutex);
bool value;
if (stream < m_data.size()) {
value = m_vHead[stream] != m_vFill[stream];
} else {
value = false;
}
return value;
}

View File

@ -37,11 +37,17 @@ public:
std::vector<SampleVector::const_iterator>& vpart1Begin, std::vector<SampleVector::const_iterator>& vpart1End,
std::vector<SampleVector::const_iterator>& vpart2Begin, std::vector<SampleVector::const_iterator>& vpart2End
);
void readSync(int& part1Begin, int& part1End, int& part2Begin, int& part2End);
void readSync(
std::vector<int>& vPart1Begin, std::vector<int>& vPart1End,
std::vector<int>& vPart2Begin, std::vector<int>& vPart2End
);
void readAsync(unsigned int stream,
SampleVector::const_iterator& part1Begin, SampleVector::const_iterator& part1End,
SampleVector::const_iterator& part2Begin, SampleVector::const_iterator& part2End);
const std::vector<SampleVector>& getData() { return m_data; }
unsigned int getNbStreams() const { return m_data.size(); }
bool dataAvailable();
bool dataAvailable(unsigned int stream);
signals:
@ -52,8 +58,6 @@ private:
std::vector<SampleVector> m_data;
std::vector<int> m_vFill; //!< Number of samples written from beginning of samples vector
std::vector<int> m_vHead; //!< Number of samples read from beginning of samples vector
int m_fill;
int m_head;
QMutex m_mutex;
};