From ff617b6f7dc1ad0e969b10bd828b619cd1170945 Mon Sep 17 00:00:00 2001 From: f4exb Date: Wed, 2 Oct 2019 02:05:33 +0200 Subject: [PATCH] Reworked sample MI FIFO and MIMO device engine --- plugins/samplemimo/testmi/testmi.cpp | 3 - plugins/samplemimo/testmi/testmithread.cpp | 2 +- sdrbase/device/deviceapi.cpp | 28 - sdrbase/device/deviceapi.h | 6 - sdrbase/dsp/dspdevicemimoengine.cpp | 91 +--- sdrbase/dsp/dspdevicemimoengine.h | 23 +- sdrbase/dsp/samplemififo.cpp | 563 ++++++++++++++++----- sdrbase/dsp/samplemififo.h | 88 +++- 8 files changed, 511 insertions(+), 293 deletions(-) diff --git a/plugins/samplemimo/testmi/testmi.cpp b/plugins/samplemimo/testmi/testmi.cpp index 5e6b5ec8e..e0cd9aae9 100644 --- a/plugins/samplemimo/testmi/testmi.cpp +++ b/plugins/samplemimo/testmi/testmi.cpp @@ -74,9 +74,6 @@ TestMI::~TestMI() m_deviceAPI->removeAncillarySink(*it, istream); delete *it; } - - m_deviceAPI->removeLastSourceStream(); // Remove the last source stream data set in the engine - m_deviceAPI->removeLastSourceStream(); // Remove the last source stream data set in the engine } void TestMI::destroy() diff --git a/plugins/samplemimo/testmi/testmithread.cpp b/plugins/samplemimo/testmi/testmithread.cpp index 3162868e7..c56bca121 100644 --- a/plugins/samplemimo/testmi/testmithread.cpp +++ b/plugins/samplemimo/testmi/testmithread.cpp @@ -386,7 +386,7 @@ void TestMIThread::callback(const qint16* buf, qint32 len) break; } - m_sampleFifo->writeAsync(m_streamIndex, m_convertBuffer.begin(), it - m_convertBuffer.begin()); + m_sampleFifo->writeAsync(m_convertBuffer.begin(), it - m_convertBuffer.begin(), m_streamIndex); } void TestMIThread::tick() diff --git a/sdrbase/device/deviceapi.cpp b/sdrbase/device/deviceapi.cpp index afcb178c7..22fc7043d 100644 --- a/sdrbase/device/deviceapi.cpp +++ b/sdrbase/device/deviceapi.cpp @@ -58,34 +58,6 @@ DeviceAPI::~DeviceAPI() { } -void DeviceAPI::addSourceStream() -{ - if (m_deviceMIMOEngine) { - m_deviceMIMOEngine->addSourceStream(); - } -} - -void DeviceAPI::removeLastSourceStream() -{ - if (m_deviceMIMOEngine) { - m_deviceMIMOEngine->removeLastSourceStream(); - } -} - -void DeviceAPI::addSinkStream() -{ - if (m_deviceMIMOEngine) { - m_deviceMIMOEngine->addSinkStream(); - } -} - -void DeviceAPI::removeLastSinkStream() -{ - if (m_deviceMIMOEngine) { - m_deviceMIMOEngine->removeLastSinkStream(); - } -} - void DeviceAPI::addAncillarySink(BasebandSampleSink *sink, unsigned int index) { if (m_deviceSourceEngine) { diff --git a/sdrbase/device/deviceapi.h b/sdrbase/device/deviceapi.h index 60bc044ea..b8e756e19 100644 --- a/sdrbase/device/deviceapi.h +++ b/sdrbase/device/deviceapi.h @@ -67,12 +67,6 @@ public: ); ~DeviceAPI(); - // MIMO Engine baseband / channel lists management - void addSourceStream(); - void removeLastSourceStream(); - void addSinkStream(); - void removeLastSinkStream(); - void addAncillarySink(BasebandSampleSink* sink, unsigned int index = 0); //!< Adds a sink to receive full baseband and that is not a channel (e.g. spectrum) void removeAncillarySink(BasebandSampleSink* sink, unsigned int index = 0); //!< Removes it void setSpectrumSinkInput(bool sourceElseSink = true, unsigned int index = 0); //!< Used in the MIMO case to select which stream is used as input to main spectrum diff --git a/sdrbase/dsp/dspdevicemimoengine.cpp b/sdrbase/dsp/dspdevicemimoengine.cpp index d4fdd6e9f..cd6d471dc 100644 --- a/sdrbase/dsp/dspdevicemimoengine.cpp +++ b/sdrbase/dsp/dspdevicemimoengine.cpp @@ -27,10 +27,6 @@ #include "dspdevicemimoengine.h" MESSAGE_CLASS_DEFINITION(DSPDeviceMIMOEngine::SetSampleMIMO, Message) -MESSAGE_CLASS_DEFINITION(DSPDeviceMIMOEngine::AddSourceStream, Message) -MESSAGE_CLASS_DEFINITION(DSPDeviceMIMOEngine::RemoveLastSourceStream, Message) -MESSAGE_CLASS_DEFINITION(DSPDeviceMIMOEngine::AddSinkStream, Message) -MESSAGE_CLASS_DEFINITION(DSPDeviceMIMOEngine::RemoveLastSinkStream, Message) MESSAGE_CLASS_DEFINITION(DSPDeviceMIMOEngine::AddThreadedBasebandSampleSource, Message) MESSAGE_CLASS_DEFINITION(DSPDeviceMIMOEngine::RemoveThreadedBasebandSampleSource, Message) MESSAGE_CLASS_DEFINITION(DSPDeviceMIMOEngine::AddThreadedBasebandSampleSink, Message) @@ -124,34 +120,6 @@ void DSPDeviceMIMOEngine::setMIMOSequence(int sequence) m_sampleMIMOSequence = sequence; } -void DSPDeviceMIMOEngine::addSourceStream() -{ - qDebug("DSPDeviceMIMOEngine::addSourceStream"); - AddSourceStream cmd; - m_syncMessenger.sendWait(cmd); -} - -void DSPDeviceMIMOEngine::removeLastSourceStream() -{ - qDebug("DSPDeviceMIMOEngine::removeLastSourceStream"); - RemoveLastSourceStream cmd; - m_syncMessenger.sendWait(cmd); -} - -void DSPDeviceMIMOEngine::addSinkStream() -{ - qDebug("DSPDeviceMIMOEngine::addSinkStream"); - AddSinkStream cmd; - m_syncMessenger.sendWait(cmd); -} - -void DSPDeviceMIMOEngine::removeLastSinkStream() -{ - qDebug("DSPDeviceMIMOEngine::removeLastSinkStream"); - RemoveLastSourceStream cmd; - m_syncMessenger.sendWait(cmd); -} - void DSPDeviceMIMOEngine::addChannelSource(ThreadedBasebandSampleSource* source, int index) { qDebug() << "DSPDeviceMIMOEngine::addThreadedSource: " @@ -275,36 +243,34 @@ void DSPDeviceMIMOEngine::workSampleSinkFifos() return; } - std::vector vPart1Begin; - std::vector vPart1End; - std::vector vPart2Begin; - std::vector vPart2End; + int iPart1Begin; + int iPart1End; + int iPart2Begin; + int iPart2End; std::vector data = sampleFifo->getData(); - while (sampleFifo->dataAvailable()) + while (sampleFifo->fillSync() > 0) { - vPart1Begin.clear(); - vPart1End.clear(); - vPart2Begin.clear(); - vPart2End.clear(); - sampleFifo->readSync(vPart1Begin, vPart1End, vPart2Begin, vPart2End); + unsigned int count = sampleFifo->readSync(sampleFifo->fillSync(), iPart1Begin, iPart1End, iPart2Begin, iPart2End); for (unsigned int stream = 0; stream < data.size(); stream++) { SampleVector::const_iterator begin = data[stream].begin(); - if (vPart1Begin[stream] != vPart1End[stream]) { - m_vectorBuffer.write(begin + vPart1Begin[stream], begin + vPart1End[stream], false); + if (iPart1Begin != iPart1End) { + m_vectorBuffer.write(data[stream].begin() + iPart1Begin, data[stream].begin() + iPart1End, false); } - if (vPart2Begin[stream] != vPart2End[stream]) { - m_vectorBuffer.write(begin + vPart2Begin[stream], begin + vPart2End[stream]); + if (iPart2Begin != iPart2End) { + m_vectorBuffer.write(data[stream].begin() + iPart2Begin, data[stream].begin() + iPart2End); } SampleVector::iterator vbegin, vend; m_vectorBuffer.read(vbegin, vend); workSamples(vbegin, vend, stream); } + + sampleFifo->readCommitSync(count); } } @@ -321,9 +287,9 @@ void DSPDeviceMIMOEngine::workSampleSinkFifo(unsigned int stream) SampleVector::const_iterator part2begin; SampleVector::const_iterator part2end; - while (sampleFifo->dataAvailable(stream)) + while (sampleFifo->fillAsync(stream) > 0) { - sampleFifo->readAsync(stream, part1begin, part1end, part2begin, part2end); + unsigned int count = sampleFifo->readAsync(sampleFifo->fillAsync(stream), &part1begin, &part1end, &part2begin, &part2end, stream); if (part1begin != part1end) { // first part of FIFO data m_vectorBuffer.write(part1begin, part1end, false); @@ -336,6 +302,8 @@ void DSPDeviceMIMOEngine::workSampleSinkFifo(unsigned int stream) SampleVector::iterator vbegin, vend; m_vectorBuffer.read(vbegin, vend); workSamples(vbegin, vend, stream); + + sampleFifo->readCommitAsync(count, stream); } } @@ -343,13 +311,13 @@ void DSPDeviceMIMOEngine::workSampleSinkFifo(unsigned int stream) * Routes samples from device source FIFO to sink channels that are registered for the FIFO * Routes samples from source channels registered for the FIFO to the device sink FIFO */ -void DSPDeviceMIMOEngine::workSamples(const SampleVector::iterator& vbegin, const SampleVector::iterator& vend, unsigned int sinkIndex) +void DSPDeviceMIMOEngine::workSamples(const SampleVector::const_iterator& vbegin, const SampleVector::const_iterator& vend, unsigned int sinkIndex) { bool positiveOnly = false; // DC and IQ corrections - if (m_sourcesCorrections[sinkIndex].m_dcOffsetCorrection) { - iqCorrections(vbegin, vend, sinkIndex, m_sourcesCorrections[sinkIndex].m_iqImbalanceCorrection); - } + // if (m_sourcesCorrections[sinkIndex].m_dcOffsetCorrection) { + // iqCorrections(vbegin, vend, sinkIndex, m_sourcesCorrections[sinkIndex].m_iqImbalanceCorrection); + // } // feed data to direct sinks if (sinkIndex < m_basebandSampleSinks.size()) @@ -726,25 +694,6 @@ void DSPDeviceMIMOEngine::handleSynchronousMessages() else if (SetSampleMIMO::match(*message)) { handleSetMIMO(((SetSampleMIMO*) message)->getSampleMIMO()); } - else if (AddSourceStream::match(*message)) - { - m_basebandSampleSinks.push_back(BasebandSampleSinks()); - m_threadedBasebandSampleSinks.push_back(ThreadedBasebandSampleSinks()); - m_sourcesCorrections.push_back(SourceCorrection()); - } - else if (RemoveLastSourceStream::match(*message)) - { - m_basebandSampleSinks.pop_back(); - m_threadedBasebandSampleSinks.pop_back(); - } - else if (AddSinkStream::match(*message)) - { - m_threadedBasebandSampleSources.push_back(ThreadedBasebandSampleSources()); - } - else if (RemoveLastSinkStream::match(*message)) - { - m_threadedBasebandSampleSources.pop_back(); - } else if (AddBasebandSampleSink::match(*message)) { const AddBasebandSampleSink *msg = (AddBasebandSampleSink *) message; diff --git a/sdrbase/dsp/dspdevicemimoengine.h b/sdrbase/dsp/dspdevicemimoengine.h index 3570d715c..4d84a7005 100644 --- a/sdrbase/dsp/dspdevicemimoengine.h +++ b/sdrbase/dsp/dspdevicemimoengine.h @@ -63,22 +63,6 @@ public: unsigned int m_index; }; - class AddSourceStream : public Message { - MESSAGE_CLASS_DECLARATION - }; - - class RemoveLastSourceStream : public Message { - MESSAGE_CLASS_DECLARATION - }; - - class AddSinkStream : public Message { - MESSAGE_CLASS_DECLARATION - }; - - class RemoveLastSinkStream : public Message { - MESSAGE_CLASS_DECLARATION - }; - class RemoveThreadedBasebandSampleSource : public Message { MESSAGE_CLASS_DECLARATION @@ -272,11 +256,6 @@ public: void setMIMOSequence(int sequence); //!< Set the sample MIMO sequence in type uint getUID() const { return m_uid; } - void addSourceStream(); - void removeLastSourceStream(); - void addSinkStream(); - void removeLastSinkStream(); - void addChannelSource(ThreadedBasebandSampleSource* source, int index = 0); //!< Add a channel source that will run on its own thread void removeChannelSource(ThreadedBasebandSampleSource* source, int index = 0); //!< Remove a channel source that runs on its own thread void addChannelSink(ThreadedBasebandSampleSink* sink, int index = 0); //!< Add a channel sink that will run on its own thread @@ -383,7 +362,7 @@ private: void run(); void workSampleSinkFifos(); //!< transfer samples of all sinks (sync mode) void workSampleSinkFifo(unsigned int stream); //!< transfer samples of one sink (async mode) - void workSamples(const SampleVector::iterator& vbegin, const SampleVector::iterator& vend, unsigned int sinkIndex); + void workSamples(const SampleVector::const_iterator& vbegin, const SampleVector::const_iterator& vend, unsigned int sinkIndex); State gotoIdle(); //!< Go to the idle state State gotoInit(); //!< Go to the acquisition init state from idle diff --git a/sdrbase/dsp/samplemififo.cpp b/sdrbase/dsp/samplemififo.cpp index b9a3fb8df..c27cd79b5 100644 --- a/sdrbase/dsp/samplemififo.cpp +++ b/sdrbase/dsp/samplemififo.cpp @@ -17,205 +17,494 @@ #include "samplemififo.h" -SampleMIFifo::SampleMIFifo(QObject *parent) : - QObject(parent) -{ -} - -SampleMIFifo::SampleMIFifo(unsigned int nbStreams, unsigned int size, QObject *parent) : - QObject(parent) -{ - for (unsigned int i = 0; i < nbStreams; i++) - { - m_data.push_back(SampleVector()); - m_data.back().resize(size); - m_vFill.push_back(0); - m_vHead.push_back(0); - } -} +#define MIN(x, y) (((x) < (y)) ? (x) : (y)) void SampleMIFifo::init(unsigned int nbStreams, unsigned int size) { - m_data.clear(); - m_vFill.clear(); - m_vHead.clear(); + m_nbStreams = nbStreams; + m_size = size; + m_fill = 0; + m_head = 0; + m_tail = 0; + m_data.resize(nbStreams); + m_vfill.resize(nbStreams); + m_vhead.resize(nbStreams); + m_vtail.resize(nbStreams); - for (unsigned int i = 0; i < nbStreams; i++) + for (unsigned int stream = 0; stream < nbStreams; stream++) { - m_data.push_back(SampleVector()); - m_data.back().resize(size); - m_vFill.push_back(0); - m_vHead.push_back(0); + m_data[stream].resize(size); + m_vfill[stream] = 0; + m_vhead[stream] = 0; + m_vtail[stream] = 0; } } -void SampleMIFifo::writeSync(const std::vector& vbegin, unsigned int size) +SampleMIFifo::SampleMIFifo(QObject* parent) : + QObject(parent) { - if ((m_data.size() == 0) || (m_data.size() != vbegin.size())) { - return; - } + m_suppressed = -1; + m_size = 0; + m_fill = 0; + m_head = 0; + m_tail = 0; +} - QMutexLocker mutexLocker(&m_mutex); +SampleMIFifo::SampleMIFifo(unsigned int nbStreams, unsigned int size, QObject* parent) : + QObject(parent) +{ + init(nbStreams, size); - for (unsigned int stream = 0; stream < m_data.size(); stream++) + m_suppressed = -1; + + for (unsigned int stream = 0; stream < nbStreams; stream++) { - int spaceLeft = m_data[stream].size() - m_vFill[stream]; + m_vsuppressed[stream] = -1; + m_vmsgRateTimer.push_back(QTime()); + } +} - if (size < spaceLeft) +unsigned int SampleMIFifo::writeSync(const quint8* data, unsigned int count) +{ + QMutexLocker mutexLocker(&m_mutex); + unsigned int total; + unsigned int remaining; + unsigned int len; + unsigned int byteCount = count; + count /= sizeof(Sample); + + total = std::min(count, m_size - m_fill); + + if (total < count) + { + if (m_suppressed < 0) { - std::copy(vbegin[stream], vbegin[stream] + size, m_data[stream].begin() + m_vFill[stream]); - m_vFill[stream] += size; - } + m_suppressed = 0; + m_msgRateTimer.start(); + qCritical("SampleMIFifo: overflow - dropping %u samples", count - total); + } else { - int remaining = size - spaceLeft; - std::copy(vbegin[stream], vbegin[stream] + spaceLeft, m_data[stream].begin() + m_vFill[stream]); - std::copy(vbegin[stream] + spaceLeft, vbegin[stream] + size, m_data[stream].begin()); - m_vFill[stream] = remaining; - } + if (m_msgRateTimer.elapsed() > 2500) + { + qCritical("SampleMIFifo: %u messages dropped", m_suppressed); + qCritical("SampleMIFifo: overflow - dropping %u samples", count - total); + m_suppressed = -1; + } + else + { + m_suppressed++; + } + } + } + + remaining = total; + std::vector vbegin; + vbegin.resize(m_nbStreams); + + for (unsigned int stream = 0; stream < m_nbStreams; stream++) { + vbegin[stream] = (const Sample*) &data[stream*byteCount]; } - emit dataSyncReady(); + while (remaining > 0) + { + len = std::min(remaining, m_size - m_tail); + + for (unsigned int stream = 0; stream < m_nbStreams; stream++) + { + std::copy(vbegin[stream], vbegin[stream] + len, m_data[stream].begin() + m_tail); + vbegin[stream] += len; + } + + m_tail += len; + m_tail %= m_size; + m_fill += len; + remaining -= len; + } + + if (m_fill > 0) { + emit dataSyncReady(); + } + + return total; } -void SampleMIFifo::writeAsync(unsigned int stream, const SampleVector::const_iterator& begin, unsigned int size) +unsigned int SampleMIFifo::writeSync(std::vector vbegin, unsigned int count) { - if (stream < m_data.size()) - { - QMutexLocker mutexLocker(&m_mutex); - int spaceLeft = m_data[stream].size() - m_vFill[stream]; + if ((vbegin.size() != m_nbStreams)) { + return 0; + } - if (size < spaceLeft) + QMutexLocker mutexLocker(&m_mutex); + unsigned int total; + unsigned int remaining; + unsigned int len; + + total = std::min(count, m_size - m_fill); + + if (total < count) + { + if (m_suppressed < 0) { - std::copy(begin, begin + size, m_data[stream].begin() + m_vFill[stream]); - m_vFill[stream] += size; - } + m_suppressed = 0; + m_msgRateTimer.start(); + qCritical("SampleMIFifo::writeSync: overflow - dropping %u samples", count - total); + } else { - int remaining = size - spaceLeft; - std::copy(begin, begin + spaceLeft, m_data[stream].begin() + m_vFill[stream]); - std::copy(begin + spaceLeft, begin + size, m_data[stream].begin()); - m_vFill[stream] = remaining; + if (m_msgRateTimer.elapsed() > 2500) + { + qCritical("SampleMIFifo::writeSync: %u messages dropped", m_suppressed); + qCritical("SampleMIFifo::writeSync: overflow - dropping %u samples", count - total); + m_suppressed = -1; + } + else + { + m_suppressed++; + } + } + } + + remaining = total; + + while (remaining > 0) + { + len = std::min(remaining, m_size - m_tail); + + for (unsigned int stream = 0; stream < m_nbStreams; stream++) + { + std::copy(vbegin[stream], vbegin[stream] + len, m_data[stream].begin() + m_tail); + vbegin[stream] += len; } - emit dataAsyncReady(stream); + m_tail += len; + m_tail %= m_size; + m_fill += len; + remaining -= len; + } + + if (m_fill > 0) { + emit dataSyncReady(); } + + return total; } -void SampleMIFifo::readSync( - std::vector& vpart1Begin, std::vector& vpart1End, - std::vector& vpart2Begin, std::vector& vpart2End -) +unsigned int SampleMIFifo::readSync(unsigned int count, + std::vector vpart1Begin, std::vector vpart1End, + std::vector vpart2Begin, std::vector vpart2End) { - QMutexLocker mutexLocker(&m_mutex); - std::vector::iterator dataIt = m_data.begin(); - int stream = 0; - - for (; dataIt != m_data.end(); ++dataIt, ++stream) + if ((vpart1Begin.size() != m_nbStreams) + || (vpart2Begin.size() != m_nbStreams) + || (vpart1End.size() != m_nbStreams) + || (vpart2End.size() != m_nbStreams)) { - if (m_vHead[stream] < m_vFill[stream]) + return 0; + } + + QMutexLocker mutexLocker(&m_mutex); + unsigned int total; + unsigned int remaining; + unsigned int len; + unsigned int head = m_head; + + total = std::min(count, m_fill); + + if (total < count) { + qCritical("SampleMIFifo::readSync: underflow - missing %u samples", count - total); + } + + remaining = total; + + if (remaining > 0) + { + len = std::min(remaining, m_size - head); + + for (unsigned int stream = 0; stream < m_nbStreams; stream++) { - vpart1Begin.push_back(dataIt->begin() + m_vHead[stream]); - vpart1End.push_back(dataIt->begin() + m_vFill[stream]); - vpart2Begin.push_back(dataIt->begin()); - vpart2End.push_back(dataIt->begin()); + *vpart1Begin[stream] = m_data[stream].begin() + head; + *vpart1End[stream] = m_data[stream].begin() + head + len; } + + head += len; + head %= m_size; + remaining -= len; + } + else + { + for (unsigned int stream = 0; stream < m_nbStreams; stream++) + { + *vpart1Begin[stream] = m_data[stream].end(); + *vpart1End[stream] = m_data[stream].end(); + } + } + + if (remaining > 0) + { + len = std::min(remaining, m_size - head); + + for (unsigned int stream = 0; stream < m_nbStreams; stream++) + { + *vpart2Begin[stream] = m_data[stream].begin() + head; + *vpart2End[stream] = m_data[stream].begin() + head + len; + } + } + else + { + for (unsigned int stream = 0; stream < m_nbStreams; stream++) + { + *vpart2Begin[stream] = m_data[stream].end(); + *vpart2End[stream] = m_data[stream].end(); + } + } + + return total; +} + +unsigned int SampleMIFifo::readSync(unsigned int count, + int& ipart1Begin, int& ipart1End, + int& ipart2Begin, int& ipart2End) +{ + QMutexLocker mutexLocker(&m_mutex); + unsigned int total; + unsigned int remaining; + unsigned int len; + unsigned int head = m_head; + + total = std::min(count, m_fill); + + if (total < count) { + qCritical("SampleMIFifo::readSync: underflow - missing %u samples", count - total); + } + + remaining = total; + + if (remaining > 0) + { + len = std::min(remaining, m_size - head); + ipart1Begin = head; + ipart1End =head + len; + head += len; + head %= m_size; + remaining -= len; + } + else + { + ipart1Begin = m_size; + ipart1End = m_size; + } + + if (remaining > 0) + { + len = std::min(remaining, m_size - head); + ipart2Begin = head; + ipart2End = head + len; + } + else + { + ipart2Begin = m_size; + ipart2End = m_size; + } + + return total; +} + +unsigned int SampleMIFifo::readCommitSync(unsigned int count) +{ + QMutexLocker mutexLocker(&m_mutex); + + if (count > m_fill) + { + qCritical("SampleMIFifo::readCommitSync: cannot commit more than available samples"); + count = m_fill; + } + + m_head = (m_head + count) % m_size; + m_fill -= count; + + return count; +} + +unsigned int SampleMIFifo::writeAsync(const quint8* data, unsigned int count, unsigned int stream) +{ + if (stream >= m_nbStreams) { + return 0; + } + + QMutexLocker mutexLocker(&m_mutex); + unsigned int total; + unsigned int remaining; + unsigned int len; + const Sample* begin = (const Sample*) data; + count /= sizeof(Sample); + total = std::min(count, m_size - m_vfill[stream]); + + if (total < count) + { + if (m_vsuppressed[stream] < 0) + { + m_vsuppressed[stream] = 0; + m_msgRateTimer.start(); + qCritical("SampleSinkFifo::writeAsync[%u]: overflow - dropping %u samples", stream, count - total); + } else { - vpart1Begin.push_back(dataIt->begin() + m_vHead[stream]); - vpart1End.push_back(dataIt->end()); - vpart2Begin.push_back(dataIt->begin()); - vpart2End.push_back(dataIt->begin() + m_vFill[stream]); - } + if (m_msgRateTimer.elapsed() > 2500) + { + qCritical("SampleSinkFifo::writeAsync[%u]: %u messages dropped", stream, m_vsuppressed[stream]); + qCritical("SampleSinkFifo::writeAsync[%u]: overflow - dropping %u samples", stream, count - total); + m_vsuppressed[stream] = -1; + } + else + { + m_vsuppressed[stream]++; + } + } + } - m_vHead[stream] = m_vFill[stream]; + remaining = total; + + while (remaining > 0) + { + len = std::min(remaining, m_size - m_vtail[stream]); + std::copy(begin, begin + len, m_data[stream].begin() + m_vtail[stream]); + begin += len; + m_vtail[stream] += len; + m_vtail[stream] %= m_size; + m_vfill[stream] += len; + remaining -= len; + } + + if (m_vfill[stream] > 0) { + emit dataAsyncReady(stream); } + + return total; } -void SampleMIFifo::readSync( - std::vector& vPart1Begin, std::vector& vPart1End, - std::vector& vPart2Begin, std::vector& vPart2End -) +unsigned int SampleMIFifo::writeAsync(SampleVector::const_iterator begin, unsigned int count, unsigned int stream) { - if (m_data.size() == 0) { - return; + if (stream >= m_nbStreams) { + return 0; } - QMutexLocker mutexLocker(&m_mutex); - std::vector::iterator dataIt = m_data.begin(); - int stream = 0; + QMutexLocker mutexLocker(&m_mutex); + unsigned int total; + unsigned int remaining; + unsigned int len; + total = std::min(count, m_size - m_vfill[stream]); - for (; dataIt != m_data.end(); ++dataIt, ++stream) + if (total < count) { - if (m_vHead[stream] < m_vFill[stream]) + if (m_vsuppressed[stream] < 0) { - vPart1Begin.push_back(m_vHead[stream]); - vPart1End.push_back(m_vFill[stream]); - vPart2Begin.push_back(0); - vPart2End.push_back(0); - } + m_vsuppressed[stream] = 0; + m_vmsgRateTimer[stream].start(); + qCritical("SampleSinkFifo::writeAsync[%u]: overflow - dropping %u samples", stream, count - total); + } else { - vPart1Begin.push_back(m_vHead[stream]); - vPart1End.push_back(dataIt->size()); - vPart2Begin.push_back(0); - vPart2End.push_back(m_vFill[stream]); - } + if (m_vmsgRateTimer[stream].elapsed() > 2500) + { + qCritical("SampleSinkFifo::writeAsync[%u]: %u messages dropped", stream, m_vsuppressed[stream]); + qCritical("SampleSinkFifo::writeAsync[%u]: overflow - dropping %u samples", stream, count - total); + m_vsuppressed[stream] = -1; + } + else + { + m_vsuppressed[stream]++; + } + } + } - m_vHead[stream] = m_vFill[stream]; - } -} + remaining = total; -void SampleMIFifo::readAsync(unsigned int stream, - SampleVector::const_iterator& part1Begin, SampleVector::const_iterator& part1End, - SampleVector::const_iterator& part2Begin, SampleVector::const_iterator& part2End) -{ - if (stream < m_data.size()) + while (remaining > 0) { - QMutexLocker mutexLocker(&m_mutex); + len = std::min(remaining, m_size - m_vtail[stream]); + std::copy(begin, begin + len, m_data[stream].begin() + m_vtail[stream]); + begin += len; + m_vtail[stream] += len; + m_vtail[stream] %= m_size; + m_vfill[stream] += len; + remaining -= len; + } - if (m_vHead[stream] < m_vFill[stream]) - { - part1Begin = m_data[stream].begin() + m_vHead[stream]; - part1End = m_data[stream].begin() + m_vFill[stream]; - part2Begin = m_data[stream].begin(); - part2End = m_data[stream].begin(); - } - else - { - part1Begin = m_data[stream].begin() + m_vHead[stream]; - part1End = m_data[stream].end(); - part2Begin = m_data[stream].begin(); - part2End = m_data[stream].begin() + m_vFill[stream]; - } - - m_vHead[stream] = m_vFill[stream]; + if (m_vfill[stream] > 0) { + emit dataAsyncReady(stream); } + + return total; } -bool SampleMIFifo::dataAvailable() +unsigned int SampleMIFifo::readAsync(unsigned int count, + SampleVector::const_iterator* part1Begin, SampleVector::const_iterator* part1End, + SampleVector::const_iterator* part2Begin, SampleVector::const_iterator* part2End, + unsigned int stream) { - if (m_data.size() == 0) { - return false; + if (stream >= m_nbStreams) { + return 0; } - QMutexLocker mutexLocker(&m_mutex); - bool value; - value = m_vHead[0] != m_vFill[0]; + QMutexLocker mutexLocker(&m_mutex); + unsigned int total; + unsigned int remaining; + unsigned int len; + unsigned int head = m_vhead[stream]; + total = std::min(count, m_vfill[stream]); - return value; + if (total < count) { + qCritical("SampleSinkFifo::readAsync[%u]: underflow - missing %u samples", stream, count - total); + } + + remaining = total; + + if (remaining > 0) + { + len = std::min(remaining, m_size - head); + *part1Begin = m_data[stream].begin() + head; + *part1End = m_data[stream].begin() + head + len; + head += len; + head %= m_size; + remaining -= len; + } + else + { + *part1Begin = m_data[stream].end(); + *part1End = m_data[stream].end(); + } + + if (remaining > 0) + { + len = std::min(remaining, m_size - head); + *part2Begin = m_data[stream].begin() + head; + *part2End = m_data[stream].begin() + head + len; + } + else + { + *part2Begin = m_data[stream].end(); + *part2End = m_data[stream].end(); + } + + return total; } -bool SampleMIFifo::dataAvailable(unsigned int stream) +unsigned int SampleMIFifo::readCommitAsync(unsigned int count, unsigned int stream) { - QMutexLocker mutexLocker(&m_mutex); - bool value; - - if (stream < m_data.size()) { - value = m_vHead[stream] != m_vFill[stream]; - } else { - value = false; + if (stream >= m_nbStreams) { + return 0; } - return value; + QMutexLocker mutexLocker(&m_mutex); + + if (count > m_vfill[stream]) + { + qCritical("SampleSinkFifo::readCommitAsync[%u]: cannot commit more than available samples", stream); + count = m_vfill[stream]; + } + + m_vhead[stream] = (m_vhead[stream] + count) % m_size; + m_vfill[stream] -= count; + + return count; } diff --git a/sdrbase/dsp/samplemififo.h b/sdrbase/dsp/samplemififo.h index fc6a4e985..e542ca9ef 100644 --- a/sdrbase/dsp/samplemififo.h +++ b/sdrbase/dsp/samplemififo.h @@ -20,7 +20,7 @@ #include #include -#include +#include #include "dsp/dsptypes.h" #include "export.h" @@ -28,37 +28,75 @@ class SDRBASE_API SampleMIFifo : public QObject { Q_OBJECT public: - SampleMIFifo(QObject *parent = nullptr); - SampleMIFifo(unsigned int nbStreams, unsigned int size, QObject *parent = nullptr); - void init(unsigned int nbStreams, unsigned int size); - void writeSync(const std::vector& vbegin, unsigned int size); - void writeAsync(unsigned int stream, const SampleVector::const_iterator& begin, unsigned int size); - void readSync( - std::vector& vpart1Begin, std::vector& vpart1End, - std::vector& vpart2Begin, std::vector& vpart2End - ); - void readSync( - std::vector& vPart1Begin, std::vector& vPart1End, - std::vector& vPart2Begin, std::vector& vPart2End - ); - void readAsync(unsigned int stream, - SampleVector::const_iterator& part1Begin, SampleVector::const_iterator& part1End, - SampleVector::const_iterator& part2Begin, SampleVector::const_iterator& part2End); - const std::vector& getData() { return m_data; } - unsigned int getNbStreams() const { return m_data.size(); } - bool dataAvailable(); - bool dataAvailable(unsigned int stream); + SampleMIFifo(QObject* parent = nullptr); + SampleMIFifo(unsigned int nbStreams, unsigned int size, QObject* parent = nullptr); + void init(unsigned int nbStreams, unsigned int size); + inline unsigned int size() const { return m_size; } + + inline unsigned int fillSync() + { + QMutexLocker mutexLocker(&m_mutex); + unsigned int fill = m_fill; + return fill; + } + + inline unsigned int fillAsync(unsigned int stream) + { + if (stream >= m_nbStreams) { + return 0; + } + + QMutexLocker mutexLocker(&m_mutex); + unsigned int fill = m_vfill[stream]; + return fill; + } + + const std::vector& getData() { return m_data; } + unsigned int getNbStreams() const { return m_nbStreams; } + + unsigned int writeSync(const quint8* data, unsigned int count); //!< de-interleaved data in input with count bytes for each stream + unsigned int writeSync(std::vector vbegin, unsigned int count); + unsigned int readSync(unsigned int count, + std::vector vpart1Begin, std::vector vpart1End, + std::vector vpart2Begin, std::vector vpart2End); + unsigned int readSync(unsigned int count, + int& ipart1Begin, int& ipart1End, + int& ipart2Begin, int& ipart2End); + unsigned int readCommitSync(unsigned int count); + + unsigned int writeAsync(const quint8* data, unsigned int count, unsigned int stream); + unsigned int writeAsync(SampleVector::const_iterator begin, unsigned int count, unsigned int stream); + unsigned int readAsync(unsigned int count, + SampleVector::const_iterator* part1Begin, SampleVector::const_iterator* part1End, + SampleVector::const_iterator* part2Begin, SampleVector::const_iterator* part2End, + unsigned int stream); + unsigned int readCommitAsync(unsigned int count, unsigned int stream); signals: void dataSyncReady(); - void dataAsyncReady(int streamIndex); + void dataAsyncReady(int streamIndex); private: - std::vector m_data; - std::vector m_vFill; //!< Number of samples written from beginning of samples vector - std::vector m_vHead; //!< Number of samples read from beginning of samples vector QMutex m_mutex; + + std::vector m_data; + unsigned int m_nbStreams; + unsigned int m_size; + + // Synchronous + int m_suppressed; + QTime m_msgRateTimer; + unsigned int m_fill; + unsigned int m_head; + unsigned int m_tail; + + // Asynchronous + std::vector m_vsuppressed; + std::vector m_vmsgRateTimer; + std::vector m_vfill; + std::vector m_vhead; + std::vector m_vtail; }; #endif // INCLUDE_SAMPLEMIFIFO_H