1
0
mirror of https://github.com/f4exb/sdrangel.git synced 2026-06-01 13:47:01 -04:00

SampleSourceFifo refactoring and Tx code reorganization

This commit is contained in:
f4exb
2019-11-15 01:04:24 +01:00
parent 246ff824af
commit 3b74153ec6
198 changed files with 13267 additions and 7750 deletions
+116 -80
View File
@@ -19,16 +19,16 @@
#include <QDebug>
#include "dspcommands.h"
#include "threadedbasebandsamplesource.h"
#include "threadedbasebandsamplesink.h"
#include "basebandsamplesource.h"
#include "devicesamplemimo.h"
#include "mimochannel.h"
#include "dspdevicemimoengine.h"
MESSAGE_CLASS_DEFINITION(DSPDeviceMIMOEngine::SetSampleMIMO, Message)
MESSAGE_CLASS_DEFINITION(DSPDeviceMIMOEngine::AddThreadedBasebandSampleSource, Message)
MESSAGE_CLASS_DEFINITION(DSPDeviceMIMOEngine::RemoveThreadedBasebandSampleSource, Message)
MESSAGE_CLASS_DEFINITION(DSPDeviceMIMOEngine::AddBasebandSampleSource, Message)
MESSAGE_CLASS_DEFINITION(DSPDeviceMIMOEngine::RemoveBasebandSampleSource, Message)
MESSAGE_CLASS_DEFINITION(DSPDeviceMIMOEngine::AddThreadedBasebandSampleSink, Message)
MESSAGE_CLASS_DEFINITION(DSPDeviceMIMOEngine::RemoveThreadedBasebandSampleSink, Message)
MESSAGE_CLASS_DEFINITION(DSPDeviceMIMOEngine::AddMIMOChannel, Message)
@@ -151,23 +151,23 @@ void DSPDeviceMIMOEngine::setMIMOSequence(int sequence)
m_sampleMIMOSequence = sequence;
}
void DSPDeviceMIMOEngine::addChannelSource(ThreadedBasebandSampleSource* source, int index)
void DSPDeviceMIMOEngine::addChannelSource(BasebandSampleSource* source, int index)
{
qDebug() << "DSPDeviceMIMOEngine::addThreadedSource: "
qDebug() << "DSPDeviceMIMOEngine::addChannelSource: "
<< source->objectName().toStdString().c_str()
<< " at: "
<< index;
AddThreadedBasebandSampleSource cmd(source, index);
AddBasebandSampleSource cmd(source, index);
m_syncMessenger.sendWait(cmd);
}
void DSPDeviceMIMOEngine::removeChannelSource(ThreadedBasebandSampleSource* source, int index)
void DSPDeviceMIMOEngine::removeChannelSource(BasebandSampleSource* source, int index)
{
qDebug() << "DSPDeviceMIMOEngine::removeThreadedSource: "
qDebug() << "DSPDeviceMIMOEngine::removeChannelSource: "
<< source->objectName().toStdString().c_str()
<< " at: "
<< index;
RemoveThreadedBasebandSampleSource cmd(source, index);
RemoveBasebandSampleSource cmd(source, index);
m_syncMessenger.sendWait(cmd);
}
@@ -311,17 +311,30 @@ void DSPDeviceMIMOEngine::workSampleSourceFifos()
}
std::vector<SampleVector::iterator> vbegin;
vbegin.resize(sampleFifo->getNbStreams());
std::vector<SampleVector> data = sampleFifo->getData();
unsigned int iPart1Begin, iPart1End, iPart2Begin, iPart2End;
unsigned int remainder = sampleFifo->remainderSync();
while ((remainder > 0) && (m_inputMessageQueue.size() == 0))
{
sampleFifo->writeSync(remainder, iPart1Begin, iPart1End, iPart2Begin, iPart2End);
// pull samples from the sources by stream
for (unsigned int streamIndex = 0; streamIndex < sampleFifo->getNbStreams(); streamIndex++) {
workSamplesSource(vbegin[streamIndex], remainder, streamIndex);
if (iPart1Begin != iPart1End)
{
for (unsigned int streamIndex = 0; streamIndex < sampleFifo->getNbStreams(); streamIndex++) {
workSamplesSource(data[streamIndex], iPart1Begin, iPart1End, streamIndex);
}
}
// write pulled samples to FIFO
sampleFifo->writeSync(vbegin, remainder);
if (iPart2Begin != iPart2End)
{
for (unsigned int streamIndex = 0; streamIndex < sampleFifo->getNbStreams(); streamIndex++) {
workSamplesSource(data[streamIndex], iPart2Begin, iPart2End, streamIndex);
}
}
// get new remainder
remainder = sampleFifo->remainderSync();
}
@@ -364,15 +377,21 @@ void DSPDeviceMIMOEngine::workSampleSourceFifo(unsigned int streamIndex)
return;
}
SampleVector::iterator begin;
SampleVector& data = sampleFifo->getData(streamIndex);
unsigned int iPart1Begin, iPart1End, iPart2Begin, iPart2End;
unsigned int amount = sampleFifo->remainderAsync(streamIndex);
while ((amount > 0) && (m_inputMessageQueue.size() == 0))
{
// pull remainderAsync() samples from the sources stream
workSamplesSource(begin, amount, streamIndex);
// write pulled samples to FIFO's corresponding stream
sampleFifo->writeAsync(begin, amount, streamIndex);
sampleFifo->writeAsync(amount, iPart1Begin, iPart1End, iPart2Begin, iPart2End, streamIndex);
// part1
if (iPart1Begin != iPart1End) {
workSamplesSource(data, iPart1Begin, iPart1End, streamIndex);
}
// part2
if (iPart2Begin != iPart2End) {
workSamplesSource(data, iPart2Begin, iPart2End, streamIndex);
}
// get new amount
amount = sampleFifo->remainderAsync(streamIndex);
}
@@ -415,41 +434,56 @@ void DSPDeviceMIMOEngine::workSamplesSink(const SampleVector::const_iterator& vb
}
}
void DSPDeviceMIMOEngine::workSamplesSource(SampleVector::iterator& begin, unsigned int nbSamples, unsigned int streamIndex)
void DSPDeviceMIMOEngine::workSamplesSource(SampleVector& data, unsigned int iBegin, unsigned int iEnd, unsigned int streamIndex)
{
if (m_threadedBasebandSampleSources[streamIndex].size() == 0)
unsigned int nbSamples = iEnd - iBegin;
SampleVector::iterator begin = data.begin() + iBegin;
m_sourceZeroBuffers[streamIndex].allocate(nbSamples, Sample{16384,0});
std::copy(
m_sourceZeroBuffers[streamIndex].m_vector.begin(),
m_sourceZeroBuffers[streamIndex].m_vector.begin() + nbSamples,
begin
);
m_spectrumSink->feed(begin, begin + nbSamples, false);
qDebug("DSPDeviceMIMOEngine::workSamplesSource: nbSamples: %u streamIndex: %u", nbSamples, streamIndex);
return;
if (m_basebandSampleSources[streamIndex].size() == 0)
{
m_sourceZeroBuffers[streamIndex].allocate(nbSamples, Sample{0,0});
begin = m_sourceZeroBuffers[streamIndex].m_vector.begin();
std::copy(
m_sourceZeroBuffers[streamIndex].m_vector.begin(),
m_sourceZeroBuffers[streamIndex].m_vector.begin() + nbSamples,
begin
);
}
else if (m_threadedBasebandSampleSources[streamIndex].size() == 1)
else if (m_basebandSampleSources[streamIndex].size() == 1)
{
ThreadedBasebandSampleSource *sampleSource = m_threadedBasebandSampleSources[streamIndex].front();
sampleSource->getSampleSourceFifo().readAdvance(begin, nbSamples);
begin -= nbSamples;
BasebandSampleSource *sampleSource = m_basebandSampleSources[streamIndex].front();
sampleSource->pull(begin, nbSamples);
}
else
{
SampleVector::const_iterator sourceBegin;
ThreadedBasebandSampleSources::const_iterator it = m_threadedBasebandSampleSources[streamIndex].begin();
ThreadedBasebandSampleSource *sampleSource = *it;
sampleSource->getSampleSourceFifo().readAdvance(sourceBegin, nbSamples);
sourceBegin -= nbSamples;
m_sourceSampleBuffers[streamIndex].allocate(nbSamples);
std::copy(sourceBegin, sourceBegin + nbSamples, m_sourceSampleBuffers[streamIndex].m_vector.begin());
++it;
BasebandSampleSources::const_iterator srcIt = m_basebandSampleSources[streamIndex].begin();
BasebandSampleSource *sampleSource = *srcIt;
sampleSource->pull(begin, nbSamples);
++srcIt;
for (; it != m_threadedBasebandSampleSources[streamIndex].end(); ++it)
for (; srcIt != m_basebandSampleSources[streamIndex].end(); ++srcIt)
{
sampleSource = *it;
sampleSource->getSampleSourceFifo().readAdvance(sourceBegin, nbSamples);
sourceBegin -= nbSamples;
sampleSource = *srcIt;
SampleVector::iterator aBegin = m_sourceSampleBuffers[streamIndex].m_vector.begin();
sampleSource->pull(aBegin, nbSamples);
std::transform(
m_sourceSampleBuffers[streamIndex].m_vector.begin(),
m_sourceSampleBuffers[streamIndex].m_vector.begin() + nbSamples,
sourceBegin,
m_sourceSampleBuffers[streamIndex].m_vector.begin(),
[](Sample& a, const Sample& b) -> Sample {
aBegin,
aBegin + nbSamples,
begin,
begin,
[](Sample& a, const Sample& b) -> Sample { // TODO: scale by number of sources
return Sample{a.real()+b.real(), a.imag()+b.imag()};
}
);
@@ -459,7 +493,9 @@ void DSPDeviceMIMOEngine::workSamplesSource(SampleVector::iterator& begin, unsig
}
// pull data from MIMO channels
for (MIMOChannels::const_iterator it = m_mimoChannels.begin(); it != m_mimoChannels.end(); ++it) {
for (MIMOChannels::const_iterator it = m_mimoChannels.begin(); it != m_mimoChannels.end(); ++it)
{
//qDebug("DSPDeviceMIMOEngine::workSamplesSource: nbSamples: %u stream: %u", nbSamples, streamIndex);
(*it)->pull(begin, nbSamples, streamIndex);
}
@@ -543,13 +579,13 @@ DSPDeviceMIMOEngine::State DSPDeviceMIMOEngine::gotoIdle(int subsystemIndex)
m_deviceSampleMIMO->stopTx(); // stop everything
std::vector<ThreadedBasebandSampleSources>::const_iterator vtSourceIt = m_threadedBasebandSampleSources.begin();
std::vector<BasebandSampleSources>::const_iterator vSourceIt = m_basebandSampleSources.begin();
for (; vtSourceIt != m_threadedBasebandSampleSources.end(); vtSourceIt++)
for (; vSourceIt != m_basebandSampleSources.end(); vSourceIt++)
{
for (ThreadedBasebandSampleSources::const_iterator it = vtSourceIt->begin(); it != vtSourceIt->end(); ++it)
for (BasebandSampleSources::const_iterator it = vSourceIt->begin(); it != vSourceIt->end(); ++it)
{
qDebug() << "DSPDeviceMIMOEngine::gotoIdle: stopping ThreadedBasebandSampleSource(" << (*it)->getSampleSourceObjectName().toStdString().c_str() << ")";
qDebug() << "DSPDeviceMIMOEngine::gotoIdle: stopping BasebandSampleSource(" << (*it)->objectName().toStdString().c_str() << ")";
(*it)->stop();
}
}
@@ -664,12 +700,12 @@ DSPDeviceMIMOEngine::State DSPDeviceMIMOEngine::gotoInit(int subsystemIndex)
DSPSignalNotification notif(sinkStreamSampleRate, sinkCenterFrequency);
if (isink < m_threadedBasebandSampleSources.size())
if (isink < m_basebandSampleSources.size())
{
for (ThreadedBasebandSampleSources::const_iterator it = m_threadedBasebandSampleSources[isink].begin(); it != m_threadedBasebandSampleSources[isink].end(); ++it)
for (BasebandSampleSources::const_iterator it = m_basebandSampleSources[isink].begin(); it != m_basebandSampleSources[isink].end(); ++it)
{
qDebug() << "DSPDeviceMIMOEngine::gotoInit: initializing ThreadedSampleSource(" << (*it)->getSampleSourceObjectName().toStdString().c_str() << ")";
(*it)->handleSourceMessage(notif);
qDebug() << "DSPDeviceMIMOEngine::gotoInit: initializing BasebandSampleSource(" << (*it)->objectName().toStdString().c_str() << ")";
(*it)->handleMessage(notif);
}
}
}
@@ -732,17 +768,6 @@ DSPDeviceMIMOEngine::State DSPDeviceMIMOEngine::gotoRunning(int subsystemIndex)
}
}
std::vector<ThreadedBasebandSampleSources>::const_iterator vtSourceIt = m_threadedBasebandSampleSources.begin();
for (; vtSourceIt != m_threadedBasebandSampleSources.end(); vtSourceIt++)
{
for (ThreadedBasebandSampleSources::const_iterator it = vtSourceIt->begin(); it != vtSourceIt->end(); ++it)
{
qDebug() << "DSPDeviceMIMOEngine::gotoRunning: starting ThreadedBasebandSampleSource(" << (*it)->getSampleSourceObjectName().toStdString().c_str() << ")";
(*it)->start();
}
}
for (MIMOChannels::const_iterator it = m_mimoChannels.begin(); it != m_mimoChannels.end(); ++it)
{
qDebug() << "DSPDeviceMIMOEngine::gotoRunning: starting MIMOChannel sinks: " << (*it)->objectName().toStdString().c_str();
@@ -771,6 +796,17 @@ DSPDeviceMIMOEngine::State DSPDeviceMIMOEngine::gotoRunning(int subsystemIndex)
return gotoError(1, "Could not start sample sink");
}
std::vector<BasebandSampleSources>::const_iterator vSourceIt = m_basebandSampleSources.begin();
for (; vSourceIt != m_basebandSampleSources.end(); vSourceIt++)
{
for (BasebandSampleSources::const_iterator it = vSourceIt->begin(); it != vSourceIt->end(); ++it)
{
qDebug() << "DSPDeviceMIMOEngine::gotoRunning: starting BasebandSampleSource(" << (*it)->objectName().toStdString().c_str() << ")";
(*it)->start();
}
}
for (MIMOChannels::const_iterator it = m_mimoChannels.begin(); it != m_mimoChannels.end(); ++it)
{
qDebug() << "DSPDeviceMIMOEngine::gotoRunning: starting MIMOChannel sources: " << (*it)->objectName().toStdString().c_str();
@@ -849,7 +885,7 @@ void DSPDeviceMIMOEngine::handleSetMIMO(DeviceSampleMIMO* mimo)
for (int i = 0; i < m_deviceSampleMIMO->getNbSourceFifos(); i++)
{
m_threadedBasebandSampleSources.push_back(ThreadedBasebandSampleSources());
m_basebandSampleSources.push_back(BasebandSampleSources());
m_sourceSampleBuffers.push_back(IncrementalVector<Sample>());
m_sourceZeroBuffers.push_back(IncrementalVector<Sample>());
}
@@ -1042,36 +1078,36 @@ void DSPDeviceMIMOEngine::handleSynchronousMessages()
m_threadedBasebandSampleSinks[isource].remove(threadedSink);
}
}
else if (AddThreadedBasebandSampleSource::match(*message))
else if (AddBasebandSampleSource::match(*message))
{
const AddThreadedBasebandSampleSource *msg = (AddThreadedBasebandSampleSource *) message;
ThreadedBasebandSampleSource *threadedSource = msg->getThreadedSampleSource();
const AddBasebandSampleSource *msg = (AddBasebandSampleSource *) message;
BasebandSampleSource *sampleSource = msg->getSampleSource();
unsigned int isink = msg->getIndex();
if (isink < m_threadedBasebandSampleSources.size())
if (isink < m_basebandSampleSources.size())
{
m_threadedBasebandSampleSources[isink].push_back(threadedSource);
m_basebandSampleSources[isink].push_back(sampleSource);
// initialize sample rate and center frequency in the sink:
int sinkStreamSampleRate = m_deviceSampleMIMO->getSinkSampleRate(isink);
quint64 sinkCenterFrequency = m_deviceSampleMIMO->getSinkCenterFrequency(isink);
DSPSignalNotification msg(sinkStreamSampleRate, sinkCenterFrequency);
threadedSource->handleSourceMessage(msg);
sampleSource->handleMessage(msg);
// start the sink:
if (m_stateTx == StRunning) {
threadedSource->start();
sampleSource->start();
}
}
}
else if (RemoveThreadedBasebandSampleSource::match(*message))
else if (RemoveBasebandSampleSource::match(*message))
{
const RemoveThreadedBasebandSampleSource *msg = (RemoveThreadedBasebandSampleSource *) message;
ThreadedBasebandSampleSource* threadedSource = msg->getThreadedSampleSource();
const RemoveBasebandSampleSource *msg = (RemoveBasebandSampleSource *) message;
BasebandSampleSource* sampleSource = msg->getSampleSource();
unsigned int isink = msg->getIndex();
if (isink < m_threadedBasebandSampleSources.size())
if (isink < m_basebandSampleSources.size())
{
threadedSource->stop();
m_threadedBasebandSampleSources[isink].remove(threadedSource);
sampleSource->stop();
m_basebandSampleSources[isink].remove(sampleSource);
}
}
else if (AddMIMOChannel::match(*message))
@@ -1297,12 +1333,12 @@ void DSPDeviceMIMOEngine::handleInputMessages()
DSPSignalNotification *message = new DSPSignalNotification(sampleRate, centerFrequency);
// forward source changes to channel sources with immediate execution (no queuing)
if (istream < m_threadedBasebandSampleSources.size())
if (istream < m_basebandSampleSources.size())
{
for (ThreadedBasebandSampleSources::const_iterator it = m_threadedBasebandSampleSources[istream].begin(); it != m_threadedBasebandSampleSources[istream].end(); ++it)
for (BasebandSampleSources::const_iterator it = m_basebandSampleSources[istream].begin(); it != m_basebandSampleSources[istream].end(); ++it)
{
qDebug() << "DSPDeviceMIMOEngine::handleSinkMessages: forward message to ThreadedSampleSource(" << (*it)->getSampleSourceObjectName().toStdString().c_str() << ")";
(*it)->handleSourceMessage(*message);
qDebug() << "DSPDeviceMIMOEngine::handleSinkMessages: forward message to BasebandSampleSource(" << (*it)->objectName().toStdString().c_str() << ")";
(*it)->handleMessage(*message);
}
}