1
0
mirror of https://github.com/f4exb/sdrangel.git synced 2025-10-24 01:20:24 -04:00

MIMO: device engine Tx handling (1)

This commit is contained in:
f4exb 2019-10-19 05:07:24 +02:00
parent 1171d71c87
commit 7a1b727a36
6 changed files with 214 additions and 14 deletions

View File

@ -270,6 +270,8 @@ set(sdrbase_HEADERS
util/doublebuffer.h
util/doublebufferfifo.h
util/fixedtraits.h
util/incrementalarray.h
util/incrementalvector.h
util/message.h
util/messagequeue.h
util/movingaverage.h

View File

@ -258,20 +258,42 @@ void DSPDeviceMIMOEngine::workSampleSinkFifos()
if (iPart1Begin != iPart1End)
{
for (unsigned int stream = 0; stream < data.size(); stream++) {
workSamples(data[stream].begin() + iPart1Begin, data[stream].begin() + iPart1End, stream);
workSamplesSink(data[stream].begin() + iPart1Begin, data[stream].begin() + iPart1End, stream);
}
}
if (iPart2Begin != iPart2End)
{
for (unsigned int stream = 0; stream < data.size(); stream++) {
workSamples(data[stream].begin() + iPart2Begin, data[stream].begin() + iPart2End, stream);
workSamplesSink(data[stream].begin() + iPart2Begin, data[stream].begin() + iPart2End, stream);
}
}
}
}
void DSPDeviceMIMOEngine::workSampleSinkFifo(unsigned int stream)
void DSPDeviceMIMOEngine::workSampleSourceFifos()
{
SampleMOFifo* sampleFifo = m_deviceSampleMIMO->getSampleMOFifo();
if (!sampleFifo) {
return;
}
std::vector<SampleVector::const_iterator> vbegin;
vbegin.resize(sampleFifo->getNbStreams());
while ((sampleFifo->remainderSync() > 0) && (m_inputMessageQueue.size() == 0))
{
// pull remainderSync() samples from the sources by stream
for (unsigned int streamIndex = 0; streamIndex < sampleFifo->getNbStreams(); streamIndex++) {
workSamplesSource(vbegin[streamIndex], sampleFifo->remainderSync(), streamIndex);
}
// write pulled samples to FIFO
sampleFifo->writeSync(vbegin, sampleFifo->remainderSync());
}
}
void DSPDeviceMIMOEngine::workSampleSinkFifo(unsigned int streamIndex)
{
SampleMIFifo* sampleFifo = m_deviceSampleMIMO->getSampleMIFifo();
@ -284,26 +306,46 @@ void DSPDeviceMIMOEngine::workSampleSinkFifo(unsigned int stream)
SampleVector::const_iterator part2begin;
SampleVector::const_iterator part2end;
while ((sampleFifo->fillAsync(stream) > 0) && (m_inputMessageQueue.size() == 0))
while ((sampleFifo->fillAsync(streamIndex) > 0) && (m_inputMessageQueue.size() == 0))
{
//unsigned int count = sampleFifo->readAsync(sampleFifo->fillAsync(stream), &part1begin, &part1end, &part2begin, &part2end, stream);
sampleFifo->readAsync(&part1begin, &part1end, &part2begin, &part2end, stream);
sampleFifo->readAsync(&part1begin, &part1end, &part2begin, &part2end, streamIndex);
if (part1begin != part1end) { // first part of FIFO data
workSamples(part1begin, part1end, stream);
workSamplesSink(part1begin, part1end, streamIndex);
}
if (part2begin != part2end) { // second part of FIFO data (used when block wraps around)
workSamples(part2begin, part2end, stream);
workSamplesSink(part2begin, part2end, streamIndex);
}
}
}
void DSPDeviceMIMOEngine::workSampleSourceFifo(unsigned int streamIndex)
{
SampleMOFifo* sampleFifo = m_deviceSampleMIMO->getSampleMOFifo();
if (!sampleFifo) {
return;
}
SampleVector::const_iterator begin;
while ((sampleFifo->remainderAsync(streamIndex) > 0) && (m_inputMessageQueue.size() == 0))
{
// pull remainderAsync() samples from the sources stream
workSamplesSource(begin, sampleFifo->remainderAsync(streamIndex), streamIndex);
// write pulled samples to FIFO's corresponding stream
sampleFifo->writeAsync(begin, sampleFifo->remainderAsync(streamIndex), streamIndex);
}
}
/**
* Routes samples from device source FIFO to sink channels that are registered for the FIFO
* Routes samples from source channels registered for the FIFO to the device sink FIFO
*/
void DSPDeviceMIMOEngine::workSamples(const SampleVector::const_iterator& vbegin, const SampleVector::const_iterator& vend, unsigned int sinkIndex)
void DSPDeviceMIMOEngine::workSamplesSink(const SampleVector::const_iterator& vbegin, const SampleVector::const_iterator& vend, unsigned int sinkIndex)
{
bool positiveOnly = false;
// DC and IQ corrections
@ -336,6 +378,55 @@ void DSPDeviceMIMOEngine::workSamples(const SampleVector::const_iterator& vbegin
}
}
void DSPDeviceMIMOEngine::workSamplesSource(SampleVector::const_iterator& begin, unsigned int nbSamples, unsigned int streamIndex)
{
if (m_threadedBasebandSampleSources[streamIndex].size() == 0)
{
m_sourceSampleBuffers[streamIndex].allocate(nbSamples);
std::fill(
m_sourceSampleBuffers[streamIndex].m_vector.begin(),
m_sourceSampleBuffers[streamIndex].m_vector.begin()+nbSamples,
Sample{0,0}
);
begin = m_sourceSampleBuffers[streamIndex].m_vector.begin();
}
else if (m_threadedBasebandSampleSources[streamIndex].size() == 1)
{
ThreadedBasebandSampleSource *sampleSource = m_threadedBasebandSampleSources[streamIndex].front();
sampleSource->getSampleSourceFifo().readAdvance(begin, nbSamples);
begin -= nbSamples;
}
else
{
SampleVector::const_iterator sourceBegin;
ThreadedBasebandSampleSources::const_iterator it = m_threadedBasebandSampleSources[streamIndex].begin();
ThreadedBasebandSampleSource *sampleSource = *it;
sampleSource->getSampleSourceFifo().readAdvance(sourceBegin, nbSamples);
sourceBegin -= nbSamples;
m_sourceSampleBuffers[streamIndex].allocate(nbSamples);
std::copy(sourceBegin, sourceBegin + nbSamples, m_sourceSampleBuffers[streamIndex].m_vector.begin());
++it;
for (; it != m_threadedBasebandSampleSources[streamIndex].end(); ++it)
{
sampleSource = *it;
sampleSource->getSampleSourceFifo().readAdvance(sourceBegin, nbSamples);
sourceBegin -= nbSamples;
std::transform(
m_sourceSampleBuffers[streamIndex].m_vector.begin(),
m_sourceSampleBuffers[streamIndex].m_vector.begin() + nbSamples,
sourceBegin,
m_sourceSampleBuffers[streamIndex].m_vector.begin(),
[](Sample& a, const Sample& b) -> Sample {
return Sample{a.real()+b.real(), a.imag()+b.imag()};
}
);
}
begin = m_sourceSampleBuffers[streamIndex].m_vector.begin();
}
}
// notStarted -> idle -> init -> running -+
// ^ |
// +-----------------------+
@ -589,10 +680,24 @@ void DSPDeviceMIMOEngine::handleDataRxSync()
}
}
void DSPDeviceMIMOEngine::handleDataRxAsync(int sinkIndex)
void DSPDeviceMIMOEngine::handleDataRxAsync(int streamIndex)
{
if (m_state == StRunning) {
workSampleSinkFifo(sinkIndex);
workSampleSinkFifo(streamIndex);
}
}
void DSPDeviceMIMOEngine::handleDataTxSync()
{
if (m_state == StRunning) {
workSampleSourceFifos();
}
}
void DSPDeviceMIMOEngine::handleDataTxAsync(int streamIndex)
{
if (m_state == StRunning) {
workSampleSourceFifo(streamIndex);
}
}
@ -611,8 +716,10 @@ void DSPDeviceMIMOEngine::handleSetMIMO(DeviceSampleMIMO* mimo)
m_sourcesCorrections.push_back(SourceCorrection());
}
for (int i = 0; i < m_deviceSampleMIMO->getNbSourceFifos(); i++) {
for (int i = 0; i < m_deviceSampleMIMO->getNbSourceFifos(); i++)
{
m_threadedBasebandSampleSources.push_back(ThreadedBasebandSampleSources());
m_sourceSampleBuffers.push_back(IncrementalVector<Sample>());
}
if (m_deviceSampleMIMO->getMIMOType() == DeviceSampleMIMO::MIMOHalfSynchronous) // synchronous FIFOs on Rx and not with Tx
@ -626,6 +733,13 @@ void DSPDeviceMIMOEngine::handleSetMIMO(DeviceSampleMIMO* mimo)
&DSPDeviceMIMOEngine::handleDataRxSync,
Qt::QueuedConnection
);
QObject::connect(
m_deviceSampleMIMO->getSampleMOFifo(),
&SampleMOFifo::dataSyncRead,
this,
&DSPDeviceMIMOEngine::handleDataTxSync,
Qt::QueuedConnection
);
}
else if (m_deviceSampleMIMO->getMIMOType() == DeviceSampleMIMO::MIMOAsynchronous) // asynchronous FIFOs
{
@ -640,6 +754,13 @@ void DSPDeviceMIMOEngine::handleSetMIMO(DeviceSampleMIMO* mimo)
&DSPDeviceMIMOEngine::handleDataRxAsync,
Qt::QueuedConnection
);
QObject::connect(
m_deviceSampleMIMO->getSampleMOFifo(),
&SampleMOFifo::dataAsyncRead,
this,
&DSPDeviceMIMOEngine::handleDataTxAsync,
Qt::QueuedConnection
);
// QObject::connect(
// m_deviceSampleMIMO->getSampleSinkFifo(stream),
// &SampleSinkFifo::dataReady,

View File

@ -26,6 +26,7 @@
#include "util/messagequeue.h"
#include "util/syncmessenger.h"
#include "util/movingaverage.h"
#include "util/incrementalvector.h"
#include "export.h"
class DeviceSampleMIMO;
@ -347,6 +348,7 @@ private:
typedef std::list<ThreadedBasebandSampleSource*> ThreadedBasebandSampleSources;
std::vector<ThreadedBasebandSampleSources> m_threadedBasebandSampleSources; //!< channel sample sources on their own threads (per output stream)
std::vector<IncrementalVector<Sample>> m_sourceSampleBuffers;
typedef std::list<MIMOChannel*> MIMOChannels;
MIMOChannels m_mimoChannels; //!< MIMO channels
@ -358,9 +360,12 @@ private:
unsigned int m_spectrumInputIndex; //!< Index of the stream to be used as spectrum sink input
void run();
void workSampleSinkFifos(); //!< transfer samples of all sinks (sync mode)
void workSampleSinkFifo(unsigned int stream); //!< transfer samples of one sink (async mode)
void workSamples(const SampleVector::const_iterator& vbegin, const SampleVector::const_iterator& vend, unsigned int sinkIndex);
void workSampleSinkFifos(); //!< transfer samples of all sink streams (sync mode)
void workSampleSinkFifo(unsigned int streamIndex); //!< transfer samples of one sink stream (async mode)
void workSamplesSink(const SampleVector::const_iterator& vbegin, const SampleVector::const_iterator& vend, unsigned int streamIndex);
void workSampleSourceFifos(); //!< transfer samples of all source streams (sync mode)
void workSampleSourceFifo(unsigned int streamIndex); //!< transfer samples of one source stream (async mode)
void workSamplesSource(SampleVector::const_iterator& begin, unsigned int nbSamples, unsigned int streamIndex);
State gotoIdle(); //!< Go to the idle state
State gotoInit(); //!< Go to the acquisition init state from idle
@ -373,6 +378,8 @@ private:
private slots:
void handleDataRxSync(); //!< Handle data when Rx samples have to be processed synchronously
void handleDataRxAsync(int streamIndex); //!< Handle data when Rx samples have to be processed asynchronously
void handleDataTxSync(); //!< Handle data when Tx samples have to be processed synchronously
void handleDataTxAsync(int streamIndex); //!< Handle data when Tx samples have to be processed asynchronously
void handleSynchronousMessages(); //!< Handle synchronous messages with the thread
void handleInputMessages(); //!< Handle input message queue
};

View File

@ -68,6 +68,17 @@ void SampleSourceFifo::readAdvance(SampleVector::iterator& readUntil, unsigned i
emit dataRead(nbSamples);
}
void SampleSourceFifo::readAdvance(SampleVector::const_iterator& readUntil, unsigned int nbSamples)
{
// QMutexLocker mutexLocker(&m_mutex);
assert(nbSamples <= m_size/2);
emit dataWrite(nbSamples);
m_ir = (m_ir + nbSamples) % m_size;
readUntil = m_data.begin() + m_size + m_ir;
emit dataRead(nbSamples);
}
void SampleSourceFifo::write(const Sample& sample)
{
m_data[m_iw] = sample;

View File

@ -38,6 +38,7 @@ public:
void init();
/** advance read pointer for the given length and activate R/W signals */
void readAdvance(SampleVector::iterator& readUntil, unsigned int nbSamples);
void readAdvance(SampleVector::const_iterator& readUntil, unsigned int nbSamples);
void getReadIterator(SampleVector::iterator& readUntil); //!< get iterator past the last sample of a read advance operation (i.e. current read iterator)
void getWriteIterator(SampleVector::iterator& writeAt); //!< get iterator to current item for update - write phase 1

View File

@ -0,0 +1,58 @@
///////////////////////////////////////////////////////////////////////////////////
// Copyright (C) 2019 Edouard Griffiths, F4EXB //
// //
// This program is free software; you can redistribute it and/or modify //
// it under the terms of the GNU General Public License as published by //
// the Free Software Foundation as version 3 of the License, or //
// (at your option) any later version. //
// //
// This program is distributed in the hope that it will be useful, //
// but WITHOUT ANY WARRANTY; without even the implied warranty of //
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the //
// GNU General Public License V3 for more details. //
// //
// You should have received a copy of the GNU General Public License //
// along with this program. If not, see <http://www.gnu.org/licenses/>. //
///////////////////////////////////////////////////////////////////////////////////
#ifndef SDRBASE_UTIL_INCREMENTALVECTOR_H_
#define SDRBASE_UTIL_INCREMENTALVECTOR_H_
#include <stdint.h>
#include <vector>
template<typename T>
class IncrementalVector
{
public:
std::vector<T> m_vector;
IncrementalVector();
~IncrementalVector();
void allocate(uint32_t size);
private:
uint32_t m_size;
};
template<typename T>
IncrementalVector<T>::IncrementalVector() :
m_size(0)
{
}
template<typename T>
IncrementalVector<T>::~IncrementalVector()
{
}
template<typename T>
void IncrementalVector<T>::allocate(uint32_t size)
{
if (size <= m_size) { return; }
m_vector.resize(size);
m_size = size;
}
#endif /* SDRBASE_UTIL_INCREMENTALVECTOR_H_ */