From 8196a70753b00ea290f58893e3069080642f3fc9 Mon Sep 17 00:00:00 2001 From: f4exb Date: Tue, 28 Aug 2018 06:33:15 +0200 Subject: [PATCH] SDRdaemon: data read queue --- sdrdaemon/CMakeLists.txt | 4 +- sdrdaemon/channel/sdrdaemonchannelsource.cpp | 54 +++---- sdrdaemon/channel/sdrdaemonchannelsource.h | 5 +- .../channel/sdrdaemonchannelsourcebuffer.cpp | 85 ----------- sdrdaemon/channel/sdrdaemondatablock.h | 1 + sdrdaemon/channel/sdrdaemondatareadqueue.cpp | 142 ++++++++++++++++++ ...ourcebuffer.h => sdrdaemondatareadqueue.h} | 39 +++-- 7 files changed, 198 insertions(+), 132 deletions(-) delete mode 100644 sdrdaemon/channel/sdrdaemonchannelsourcebuffer.cpp create mode 100644 sdrdaemon/channel/sdrdaemondatareadqueue.cpp rename sdrdaemon/channel/{sdrdaemonchannelsourcebuffer.h => sdrdaemondatareadqueue.h} (67%) diff --git a/sdrdaemon/CMakeLists.txt b/sdrdaemon/CMakeLists.txt index fa97be443..edbd3aa9a 100644 --- a/sdrdaemon/CMakeLists.txt +++ b/sdrdaemon/CMakeLists.txt @@ -8,11 +8,11 @@ set(sdrdaemon_SOURCES channel/sdrdaemonchannelsink.cpp channel/sdrdaemonchannelsource.cpp channel/sdrdaemondataqueue.cpp + channel/sdrdaemondatareadqueue.cpp channel/sdrdaemonchannelsinkthread.cpp channel/sdrdaemonchannelsinksettings.cpp channel/sdrdaemonchannelsourcesettings.cpp channel/sdrdaemonchannelsourcethread.cpp - channel/sdrdaemonchannelsourcebuffer.cpp webapi/webapiadapterdaemon.cpp webapi/webapirequestmapper.cpp webapi/webapiserver.cpp @@ -26,12 +26,12 @@ set(sdrdaemon_HEADERS channel/sdrdaemonchannelsink.h channel/sdrdaemonchannelsource.h channel/sdrdaemondataqueue.h + channel/sdrdaemondatareadqueue.h channel/sdrdaemondatablock.h channel/sdrdaemonchannelsinkthread.h channel/sdrdaemonchannelsinksettings.h channel/sdrdaemonchannelsourcesettings.h channel/sdrdaemonchannelsourcethread.h - channel/sdrdaemonchannelsourcebuffer.h webapi/webapiadapterdaemon.h webapi/webapirequestmapper.h webapi/webapiserver.h diff --git a/sdrdaemon/channel/sdrdaemonchannelsource.cpp b/sdrdaemon/channel/sdrdaemonchannelsource.cpp index 511b6f49d..f8e28081f 100644 --- a/sdrdaemon/channel/sdrdaemonchannelsource.cpp +++ b/sdrdaemon/channel/sdrdaemonchannelsource.cpp @@ -70,9 +70,7 @@ SDRDaemonChannelSource::~SDRDaemonChannelSource() void SDRDaemonChannelSource::pull(Sample& sample) { - sample.m_real = 0.0f; - sample.m_imag = 0.0f; - + m_dataReadQueue.readSample(sample); m_samplesCount++; } @@ -182,9 +180,9 @@ void SDRDaemonChannelSource::applySettings(const SDRDaemonChannelSourceSettings& m_settings = settings; } -bool SDRDaemonChannelSource::handleDataBlock(SDRDaemonDataBlock& dataBlock) +void SDRDaemonChannelSource::handleDataBlock(SDRDaemonDataBlock* dataBlock) { - if (dataBlock.m_rxControlBlock.m_blockCount < SDRDaemonNbOrginalBlocks) + if (dataBlock->m_rxControlBlock.m_blockCount < SDRDaemonNbOrginalBlocks) { qWarning("SDRDaemonChannelSource::handleDataBlock: incomplete data block: not processing"); } @@ -194,16 +192,16 @@ bool SDRDaemonChannelSource::handleDataBlock(SDRDaemonDataBlock& dataBlock) for (int blockIndex = 0; blockIndex < 256; blockIndex++) { - if ((blockIndex == 0) && (dataBlock.m_rxControlBlock.m_metaRetrieved)) + if ((blockIndex == 0) && (dataBlock->m_rxControlBlock.m_metaRetrieved)) { m_cm256DescriptorBlocks[blockCount].Index = 0; - m_cm256DescriptorBlocks[blockCount].Block = (void *) &(dataBlock.m_superBlocks[0].m_protectedBlock); + m_cm256DescriptorBlocks[blockCount].Block = (void *) &(dataBlock->m_superBlocks[0].m_protectedBlock); blockCount++; } - else if (dataBlock.m_superBlocks[blockIndex].m_header.m_blockIndex != 0) + else if (dataBlock->m_superBlocks[blockIndex].m_header.m_blockIndex != 0) { - m_cm256DescriptorBlocks[blockCount].Index = dataBlock.m_superBlocks[blockIndex].m_header.m_blockIndex; - m_cm256DescriptorBlocks[blockCount].Block = (void *) &(dataBlock.m_superBlocks[blockIndex].m_protectedBlock); + m_cm256DescriptorBlocks[blockCount].Index = dataBlock->m_superBlocks[blockIndex].m_header.m_blockIndex; + m_cm256DescriptorBlocks[blockCount].Block = (void *) &(dataBlock->m_superBlocks[blockIndex].m_protectedBlock); blockCount++; } } @@ -211,15 +209,15 @@ bool SDRDaemonChannelSource::handleDataBlock(SDRDaemonDataBlock& dataBlock) //qDebug("SDRDaemonChannelSource::handleDataBlock: frame: %u blocks: %d", dataBlock.m_rxControlBlock.m_frameIndex, blockCount); // Need to use the CM256 recovery - if (m_cm256p &&(dataBlock.m_rxControlBlock.m_originalCount < SDRDaemonNbOrginalBlocks)) + if (m_cm256p &&(dataBlock->m_rxControlBlock.m_originalCount < SDRDaemonNbOrginalBlocks)) { - qDebug("SDRDaemonChannelSource::handleDataBlock: %d recovery blocks", dataBlock.m_rxControlBlock.m_recoveryCount); + qDebug("SDRDaemonChannelSource::handleDataBlock: %d recovery blocks", dataBlock->m_rxControlBlock.m_recoveryCount); CM256::cm256_encoder_params paramsCM256; paramsCM256.BlockBytes = sizeof(SDRDaemonProtectedBlock); // never changes paramsCM256.OriginalCount = SDRDaemonNbOrginalBlocks; // never changes if (m_currentMeta.m_tv_sec == 0) { - paramsCM256.RecoveryCount = dataBlock.m_rxControlBlock.m_recoveryCount; + paramsCM256.RecoveryCount = dataBlock->m_rxControlBlock.m_recoveryCount; } else { paramsCM256.RecoveryCount = m_currentMeta.m_nbFECBlocks; } @@ -227,29 +225,29 @@ bool SDRDaemonChannelSource::handleDataBlock(SDRDaemonDataBlock& dataBlock) if (m_cm256.cm256_decode(paramsCM256, m_cm256DescriptorBlocks)) // CM256 decode { qWarning() << "SDRDaemonChannelSource::handleDataBlock: decode CM256 error:" - << " m_originalCount: " << dataBlock.m_rxControlBlock.m_originalCount - << " m_recoveryCount: " << dataBlock.m_rxControlBlock.m_recoveryCount; + << " m_originalCount: " << dataBlock->m_rxControlBlock.m_originalCount + << " m_recoveryCount: " << dataBlock->m_rxControlBlock.m_recoveryCount; } else { - for (int ir = 0; ir < dataBlock.m_rxControlBlock.m_recoveryCount; ir++) // restore missing blocks + for (int ir = 0; ir < dataBlock->m_rxControlBlock.m_recoveryCount; ir++) // restore missing blocks { - int recoveryIndex = SDRDaemonNbOrginalBlocks - dataBlock.m_rxControlBlock.m_recoveryCount + ir; + int recoveryIndex = SDRDaemonNbOrginalBlocks - dataBlock->m_rxControlBlock.m_recoveryCount + ir; int blockIndex = m_cm256DescriptorBlocks[recoveryIndex].Index; SDRDaemonProtectedBlock *recoveredBlock = (SDRDaemonProtectedBlock *) m_cm256DescriptorBlocks[recoveryIndex].Block; - memcpy((void *) &(dataBlock.m_superBlocks[blockIndex].m_protectedBlock), recoveredBlock, sizeof(SDRDaemonProtectedBlock)); - if ((blockIndex == 0) && !dataBlock.m_rxControlBlock.m_metaRetrieved) { - dataBlock.m_rxControlBlock.m_metaRetrieved = true; + memcpy((void *) &(dataBlock->m_superBlocks[blockIndex].m_protectedBlock), recoveredBlock, sizeof(SDRDaemonProtectedBlock)); + if ((blockIndex == 0) && !dataBlock->m_rxControlBlock.m_metaRetrieved) { + dataBlock->m_rxControlBlock.m_metaRetrieved = true; } } } } // Validate block zero and retrieve its data - if (dataBlock.m_rxControlBlock.m_metaRetrieved) + if (dataBlock->m_rxControlBlock.m_metaRetrieved) { - SDRDaemonMetaDataFEC *metaData = (SDRDaemonMetaDataFEC *) &(dataBlock.m_superBlocks[0].m_protectedBlock); + SDRDaemonMetaDataFEC *metaData = (SDRDaemonMetaDataFEC *) &(dataBlock->m_superBlocks[0].m_protectedBlock); boost::crc_32_type crc32; crc32.process_bytes(metaData, 20); @@ -275,21 +273,17 @@ bool SDRDaemonChannelSource::handleDataBlock(SDRDaemonDataBlock& dataBlock) qWarning() << "SDRDaemonChannelSource::handleDataBlock: recovered meta: invalid CRC32"; } } + + m_dataReadQueue.push(dataBlock); // Push into R/W buffer } - //TODO: Push into R/W buffer - return true; } void SDRDaemonChannelSource::handleData() { SDRDaemonDataBlock* dataBlock; - while (m_running && ((dataBlock = m_dataQueue.pop()) != 0)) - { - if (handleDataBlock(*dataBlock)) - { - delete dataBlock; - } + while (m_running && ((dataBlock = m_dataQueue.pop()) != 0)) { + handleDataBlock(dataBlock); } } diff --git a/sdrdaemon/channel/sdrdaemonchannelsource.h b/sdrdaemon/channel/sdrdaemonchannelsource.h index 433bf8250..7b2438014 100644 --- a/sdrdaemon/channel/sdrdaemonchannelsource.h +++ b/sdrdaemon/channel/sdrdaemonchannelsource.h @@ -30,6 +30,7 @@ #include "channel/sdrdaemonchannelsourcesettings.h" #include "channel/sdrdaemondataqueue.h" #include "channel/sdrdaemondatablock.h" +#include "channel/sdrdaemondatareadqueue.h" class ThreadedBasebandSampleSource; class UpChannelizer; @@ -101,8 +102,10 @@ private: CM256::cm256_block m_cm256DescriptorBlocks[2*SDRDaemonNbOrginalBlocks]; //!< CM256 decoder descriptors (block addresses and block indexes) SDRDaemonMetaDataFEC m_currentMeta; + SDRDaemonDataReadQueue m_dataReadQueue; + void applySettings(const SDRDaemonChannelSourceSettings& settings, bool force = false); - bool handleDataBlock(SDRDaemonDataBlock& dataBlock); + void handleDataBlock(SDRDaemonDataBlock *dataBlock); void printMeta(const QString& header, SDRDaemonMetaDataFEC *metaData); private slots: diff --git a/sdrdaemon/channel/sdrdaemonchannelsourcebuffer.cpp b/sdrdaemon/channel/sdrdaemonchannelsourcebuffer.cpp deleted file mode 100644 index 8cc2112e9..000000000 --- a/sdrdaemon/channel/sdrdaemonchannelsourcebuffer.cpp +++ /dev/null @@ -1,85 +0,0 @@ -/////////////////////////////////////////////////////////////////////////////////// -// Copyright (C) 2018 Edouard Griffiths, F4EXB. // -// // -// SDRdaemon source channel (Tx). Samples buffer // -// // -// SDRdaemon is a detached SDR front end that handles the interface with a // -// physical device and sends or receives the I/Q samples stream to or from a // -// SDRangel instance via UDP. It is controlled via a Web REST API. // -// // -// This program is free software; you can redistribute it and/or modify // -// it under the terms of the GNU General Public License as published by // -// the Free Software Foundation as version 3 of the License, or // -// // -// This program is distributed in the hope that it will be useful, // -// but WITHOUT ANY WARRANTY; without even the implied warranty of // -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // -// GNU General Public License V3 for more details. // -// // -// You should have received a copy of the GNU General Public License // -// along with this program. If not, see . // -/////////////////////////////////////////////////////////////////////////////////// - -#include "channel/sdrdaemonchannelsourcebuffer.h" - -SDRDaemonChannelSourceBuffer::SDRDaemonChannelSourceBuffer(uint32_t nbSamples) -{ - m_buffer = new Sample[nbSamples]; - m_wp = nbSamples/2; - m_rp = 0; - m_size = nbSamples; -} - -SDRDaemonChannelSourceBuffer::~SDRDaemonChannelSourceBuffer() -{ - delete[] m_buffer; -} - -void SDRDaemonChannelSourceBuffer::resize(uint32_t nbSamples) -{ - if (nbSamples > m_size) - { - delete[] m_buffer; - m_buffer = new Sample[nbSamples]; - } - - m_wp = nbSamples/2; - m_rp = 0; - m_size = nbSamples; -} - -void SDRDaemonChannelSourceBuffer::write(Sample *begin, uint32_t nbSamples) -{ - if (m_wp + nbSamples < m_size) - { - std::copy(begin, begin+nbSamples, &m_buffer[m_wp]); - m_wp += nbSamples; - } - else // wrap - { - int first = m_size - m_wp; - std::copy(begin, begin+first, &m_buffer[m_wp]); - int second = nbSamples - first; - std::copy(begin+first, begin+nbSamples, m_buffer); - m_wp = second; - } -} - -void SDRDaemonChannelSourceBuffer::readOne(Sample& sample) -{ - sample = m_buffer[m_rp]; - m_rp++; - - if (m_rp == m_size) { // wrap - m_rp = 0; - } -} - -int SDRDaemonChannelSourceBuffer::getRWBalancePercent() -{ - if (m_wp > m_rp) { - return (((m_wp - m_rp) - (m_size/2))*100)/m_size; - } else { - return (((m_size/2) - (m_rp - m_wp))*100)/m_size; - } -} \ No newline at end of file diff --git a/sdrdaemon/channel/sdrdaemondatablock.h b/sdrdaemon/channel/sdrdaemondatablock.h index 31dfe66ae..bf1ab552f 100644 --- a/sdrdaemon/channel/sdrdaemondatablock.h +++ b/sdrdaemon/channel/sdrdaemondatablock.h @@ -26,6 +26,7 @@ #include #include #include +#include #include "dsp/dsptypes.h" #define UDPSINKFEC_UDPSIZE 512 diff --git a/sdrdaemon/channel/sdrdaemondatareadqueue.cpp b/sdrdaemon/channel/sdrdaemondatareadqueue.cpp new file mode 100644 index 000000000..7cc76b227 --- /dev/null +++ b/sdrdaemon/channel/sdrdaemondatareadqueue.cpp @@ -0,0 +1,142 @@ +/////////////////////////////////////////////////////////////////////////////////// +// Copyright (C) 2018 Edouard Griffiths, F4EXB. // +// // +// SDRdaemon sink channel (Rx) data blocks to read queue // +// // +// SDRdaemon is a detached SDR front end that handles the interface with a // +// physical device and sends or receives the I/Q samples stream to or from a // +// SDRangel instance via UDP. It is controlled via a Web REST API. // +// // +// This program is free software; you can redistribute it and/or modify // +// it under the terms of the GNU General Public License as published by // +// the Free Software Foundation as version 3 of the License, or // +// // +// This program is distributed in the hope that it will be useful, // +// but WITHOUT ANY WARRANTY; without even the implied warranty of // +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // +// GNU General Public License V3 for more details. // +// // +// You should have received a copy of the GNU General Public License // +// along with this program. If not, see . // +/////////////////////////////////////////////////////////////////////////////////// + +#include "channel/sdrdaemondatablock.h" +#include "channel/sdrdaemondatareadqueue.h" + +const uint32_t SDRDaemonDataReadQueue::MaxSize = 20; + +SDRDaemonDataReadQueue::SDRDaemonDataReadQueue() : + m_dataBlock(0), + m_blockIndex(1), + m_sampleIndex(0) +{} + +SDRDaemonDataReadQueue::~SDRDaemonDataReadQueue() +{ + SDRDaemonDataBlock* data; + + while ((data = pop()) != 0) + { + qDebug("SDRDaemonDataReadQueue::~SDRDaemonDataReadQueue: data block was still in queue"); + delete data; + } +} + +void SDRDaemonDataReadQueue::push(SDRDaemonDataBlock* dataBlock) +{ + if (size() > MaxSize) + { + qWarning("SDRDaemonDataReadQueue::push: queue is full"); + SDRDaemonDataBlock *data = m_dataReadQueue.takeLast(); + delete data; + } + + m_dataReadQueue.append(dataBlock); +} + +SDRDaemonDataBlock* SDRDaemonDataReadQueue::pop() +{ + if (m_dataReadQueue.isEmpty()) + { + return 0; + } + else + { + m_blockIndex = 1; + m_sampleIndex = 0; + + return m_dataReadQueue.takeFirst(); + } +} + +uint32_t SDRDaemonDataReadQueue::size() +{ + return m_dataReadQueue.size(); +} + +void SDRDaemonDataReadQueue::readSample(Sample& s) +{ + // initial state + if (m_dataBlock == 0) + { + if (size() >= MaxSize/2) + { + qDebug("SDRDaemonDataReadQueue::readSample: initial pop new block: queue size: %u", size()); + m_blockIndex = 1; + m_dataBlock = m_dataReadQueue.takeFirst(); + s = m_dataBlock->m_superBlocks[m_blockIndex].m_protectedBlock.m_samples[m_sampleIndex]; + m_sampleIndex++; + } + else + { + s = Sample{0, 0}; + } + + return; + } + + if (m_sampleIndex < SDRDaemonSamplesPerBlock) + { + s = m_dataBlock->m_superBlocks[m_blockIndex].m_protectedBlock.m_samples[m_sampleIndex]; + m_sampleIndex++; + } + else + { + m_sampleIndex = 0; + m_blockIndex++; + + if (m_blockIndex < SDRDaemonNbOrginalBlocks) + { + s = m_dataBlock->m_superBlocks[m_blockIndex].m_protectedBlock.m_samples[m_sampleIndex]; + m_sampleIndex++; + } + else + { + if (m_dataBlock) + { + delete m_dataBlock; + m_dataBlock = 0; + + if (size() == 0) { + qWarning("SDRDaemonDataReadQueue::readSample: try to pop new block but queue is empty"); + } + } + + if (size() > 0) + { + qDebug("SDRDaemonDataReadQueue::readSample: pop new block: queue size: %u", size()); + m_blockIndex = 1; + m_dataBlock = m_dataReadQueue.takeFirst(); + s = m_dataBlock->m_superBlocks[m_blockIndex].m_protectedBlock.m_samples[m_sampleIndex]; + m_sampleIndex++; + } + else + { + s = Sample{0, 0}; + } + } + } +} + + + diff --git a/sdrdaemon/channel/sdrdaemonchannelsourcebuffer.h b/sdrdaemon/channel/sdrdaemondatareadqueue.h similarity index 67% rename from sdrdaemon/channel/sdrdaemonchannelsourcebuffer.h rename to sdrdaemon/channel/sdrdaemondatareadqueue.h index 7ec10bde7..19c0687b1 100644 --- a/sdrdaemon/channel/sdrdaemonchannelsourcebuffer.h +++ b/sdrdaemon/channel/sdrdaemondatareadqueue.h @@ -1,7 +1,7 @@ /////////////////////////////////////////////////////////////////////////////////// // Copyright (C) 2018 Edouard Griffiths, F4EXB. // // // -// SDRdaemon source channel (Tx). Samples buffer // +// SDRdaemon sink channel (Rx) data blocks to read queue // // // // SDRdaemon is a detached SDR front end that handles the interface with a // // physical device and sends or receives the I/Q samples stream to or from a // @@ -20,23 +20,34 @@ // along with this program. If not, see . // /////////////////////////////////////////////////////////////////////////////////// -#include +#ifndef SDRDAEMON_CHANNEL_SDRDAEMONDATAREADQUEUE_H_ +#define SDRDAEMON_CHANNEL_SDRDAEMONDATAREADQUEUE_H_ -#include "dsp/dsptypes.h" +#include -class SDRDaemonChannelSourceBuffer +class SDRDaemonDataBlock; +class Sample; + +class SDRDaemonDataReadQueue { public: - SDRDaemonChannelSourceBuffer(uint32_t nbSamples); - ~SDRDaemonChannelSourceBuffer(); - void resize(uint32_t nbSamples); - void write(Sample *begin, uint32_t nbSamples); - void readOne(Sample& sample); - int getRWBalancePercent(); //!< positive write leads, negative read leads, balance = 0, percentage of buffer size + SDRDaemonDataReadQueue(); + ~SDRDaemonDataReadQueue(); + + void push(SDRDaemonDataBlock* dataBlock); //!< push block on the queue + SDRDaemonDataBlock* pop(); //!< Pop block from the queue + void readSample(Sample& s); //!< Read sample from queue + uint32_t size(); //!< Returns queue size + + static const uint32_t MaxSize; private: - Sample *m_buffer; - uint32_t m_size; - uint32_t m_rp; - uint32_t m_wp; + QQueue m_dataReadQueue; + SDRDaemonDataBlock *m_dataBlock; + uint32_t m_blockIndex; + uint32_t m_sampleIndex; }; + + + +#endif /* SDRDAEMON_CHANNEL_SDRDAEMONDATAREADQUEUE_H_ */