From 77ed5480341050c193b13cb9f1be7823e4d2abe1 Mon Sep 17 00:00:00 2001 From: f4exb Date: Mon, 27 Aug 2018 01:09:12 +0200 Subject: [PATCH] SDRdaemon: channel source mechanism with FEC recovery --- sdrdaemon/channel/sdrdaemonchannelsink.cpp | 16 +-- .../channel/sdrdaemonchannelsinkthread.cpp | 12 +- sdrdaemon/channel/sdrdaemonchannelsource.cpp | 105 +++++++++++++++++- sdrdaemon/channel/sdrdaemonchannelsource.h | 5 + .../channel/sdrdaemonchannelsourcethread.cpp | 72 ++++++++++-- .../channel/sdrdaemonchannelsourcethread.h | 7 +- sdrdaemon/channel/sdrdaemondatablock.h | 22 +++- 7 files changed, 209 insertions(+), 30 deletions(-) diff --git a/sdrdaemon/channel/sdrdaemonchannelsink.cpp b/sdrdaemon/channel/sdrdaemonchannelsink.cpp index 7a551fa0d..a10d6a771 100644 --- a/sdrdaemon/channel/sdrdaemonchannelsink.cpp +++ b/sdrdaemon/channel/sdrdaemonchannelsink.cpp @@ -70,7 +70,7 @@ SDRDaemonChannelSink::SDRDaemonChannelSink(DeviceSourceAPI *deviceAPI) : SDRDaemonChannelSink::~SDRDaemonChannelSink() { m_dataBlockMutex.lock(); - if (m_dataBlock && !m_dataBlock->m_controlBlock.m_complete) { + if (m_dataBlock && !m_dataBlock->m_txControlBlock.m_complete) { delete m_dataBlock; } m_dataBlockMutex.unlock(); @@ -171,13 +171,13 @@ void SDRDaemonChannelSink::feed(const SampleVector::const_iterator& begin, const if (m_txBlockIndex == SDRDaemonNbOrginalBlocks - 1) // frame complete { m_dataBlockMutex.lock(); - m_dataBlock->m_controlBlock.m_frameIndex = m_frameCount; - m_dataBlock->m_controlBlock.m_processed = false; - m_dataBlock->m_controlBlock.m_complete = true; - m_dataBlock->m_controlBlock.m_nbBlocksFEC = m_nbBlocksFEC; - m_dataBlock->m_controlBlock.m_txDelay = m_txDelay; - m_dataBlock->m_controlBlock.m_dataAddress = m_dataAddress; - m_dataBlock->m_controlBlock.m_dataPort = m_dataPort; + m_dataBlock->m_txControlBlock.m_frameIndex = m_frameCount; + m_dataBlock->m_txControlBlock.m_processed = false; + m_dataBlock->m_txControlBlock.m_complete = true; + m_dataBlock->m_txControlBlock.m_nbBlocksFEC = m_nbBlocksFEC; + m_dataBlock->m_txControlBlock.m_txDelay = m_txDelay; + m_dataBlock->m_txControlBlock.m_dataAddress = m_dataAddress; + m_dataBlock->m_txControlBlock.m_dataPort = m_dataPort; m_dataQueue.push(m_dataBlock); m_dataBlock = new SDRDaemonDataBlock(); // create a new one immediately diff --git a/sdrdaemon/channel/sdrdaemonchannelsinkthread.cpp b/sdrdaemon/channel/sdrdaemonchannelsinkthread.cpp index e975dddcb..368a37e3c 100644 --- a/sdrdaemon/channel/sdrdaemonchannelsinkthread.cpp +++ b/sdrdaemon/channel/sdrdaemonchannelsinkthread.cpp @@ -94,11 +94,11 @@ bool SDRDaemonChannelSinkThread::handleDataBlock(SDRDaemonDataBlock& dataBlock) CM256::cm256_block descriptorBlocks[256]; //!< Pointers to data for CM256 encoder SDRDaemonProtectedBlock fecBlocks[256]; //!< FEC data - uint16_t frameIndex = dataBlock.m_controlBlock.m_frameIndex; - int nbBlocksFEC = dataBlock.m_controlBlock.m_nbBlocksFEC; - int txDelay = dataBlock.m_controlBlock.m_txDelay; - m_address.setAddress(dataBlock.m_controlBlock.m_dataAddress); - uint16_t dataPort = dataBlock.m_controlBlock.m_dataPort; + uint16_t frameIndex = dataBlock.m_txControlBlock.m_frameIndex; + int nbBlocksFEC = dataBlock.m_txControlBlock.m_nbBlocksFEC; + int txDelay = dataBlock.m_txControlBlock.m_txDelay; + m_address.setAddress(dataBlock.m_txControlBlock.m_dataAddress); + uint16_t dataPort = dataBlock.m_txControlBlock.m_dataPort; SDRDaemonSuperBlock *txBlockx = dataBlock.m_superBlocks; if ((nbBlocksFEC == 0) || !m_cm256) // Do not FEC encode @@ -158,7 +158,7 @@ bool SDRDaemonChannelSinkThread::handleDataBlock(SDRDaemonDataBlock& dataBlock) } } - dataBlock.m_controlBlock.m_processed = true; + dataBlock.m_txControlBlock.m_processed = true; return true; } diff --git a/sdrdaemon/channel/sdrdaemonchannelsource.cpp b/sdrdaemon/channel/sdrdaemonchannelsource.cpp index e16e6baf9..cc1a5af32 100644 --- a/sdrdaemon/channel/sdrdaemonchannelsource.cpp +++ b/sdrdaemon/channel/sdrdaemonchannelsource.cpp @@ -20,6 +20,9 @@ // along with this program. If not, see . // /////////////////////////////////////////////////////////////////////////////////// +#include +#include + #include #include "util/simpleserializer.h" @@ -53,6 +56,7 @@ SDRDaemonChannelSource::SDRDaemonChannelSource(DeviceSinkAPI *deviceAPI) : connect(&m_dataQueue, SIGNAL(dataBlockEnqueued()), this, SLOT(handleData()), Qt::QueuedConnection); m_cm256p = m_cm256.isInitialized() ? &m_cm256 : 0; + m_currentMeta.init(); } SDRDaemonChannelSource::~SDRDaemonChannelSource() @@ -79,7 +83,7 @@ void SDRDaemonChannelSource::start() stop(); } - m_sourceThread = new SDRDaemonChannelSourceThread(&m_dataQueue, m_cm256p); + m_sourceThread = new SDRDaemonChannelSourceThread(&m_dataQueue); m_sourceThread->startStop(true); m_sourceThread->dataBind(m_dataAddress, m_dataPort); m_running = true; @@ -165,8 +169,91 @@ void SDRDaemonChannelSource::applySettings(const SDRDaemonChannelSourceSettings& m_settings = settings; } -bool SDRDaemonChannelSource::handleDataBlock(SDRDaemonDataBlock& dataBlock __attribute__((unused))) +bool SDRDaemonChannelSource::handleDataBlock(SDRDaemonDataBlock& dataBlock) { + if (dataBlock.m_rxControlBlock.m_blockCount < SDRDaemonNbOrginalBlocks) + { + qWarning("SDRDaemonChannelSource::handleDataBlock: incomplete data block: not processing"); + } + else + { + int blockCount = 0; + + for (int blockIndex = 0; blockIndex < 256; blockIndex++) + { + if ((blockIndex == 0) && (dataBlock.m_rxControlBlock.m_metaRetrieved)) + { + m_cm256DescriptorBlocks[blockCount].Index = 0; + m_cm256DescriptorBlocks[blockCount].Block = (void *) &(dataBlock.m_superBlocks[0].m_protectedBlock); + blockCount++; + } + else if (dataBlock.m_superBlocks[blockIndex].m_header.m_blockIndex != 0) + { + m_cm256DescriptorBlocks[blockCount].Index = dataBlock.m_superBlocks[blockIndex].m_header.m_blockIndex; + m_cm256DescriptorBlocks[blockCount].Block = (void *) &(dataBlock.m_superBlocks[blockIndex].m_protectedBlock); + blockCount++; + } + } + + //qDebug("SDRDaemonChannelSource::handleDataBlock: frame: %u blocks: %d", dataBlock.m_rxControlBlock.m_frameIndex, blockCount); + + // Need to use the CM256 recovery + if (m_cm256p &&(dataBlock.m_rxControlBlock.m_originalCount < SDRDaemonNbOrginalBlocks)) + { + qDebug("SDRDaemonChannelSource::handleDataBlock: %d recovery blocks", dataBlock.m_rxControlBlock.m_recoveryCount); + CM256::cm256_encoder_params paramsCM256; + paramsCM256.BlockBytes = sizeof(SDRDaemonProtectedBlock); // never changes + paramsCM256.OriginalCount = SDRDaemonNbOrginalBlocks; // never changes + + if (m_currentMeta.m_tv_sec == 0) { + paramsCM256.RecoveryCount = dataBlock.m_rxControlBlock.m_recoveryCount; + } else { + paramsCM256.RecoveryCount = m_currentMeta.m_nbFECBlocks; + } + + if (m_cm256.cm256_decode(paramsCM256, m_cm256DescriptorBlocks)) // CM256 decode + { + qWarning() << "SDRDaemonChannelSource::handleDataBlock: decode CM256 error:" + << " m_originalCount: " << dataBlock.m_rxControlBlock.m_originalCount + << " m_recoveryCount: " << dataBlock.m_rxControlBlock.m_recoveryCount; + } + else + { + for (int ir = 0; ir < dataBlock.m_rxControlBlock.m_recoveryCount; ir++) // restore missing blocks + { + int recoveryIndex = SDRDaemonNbOrginalBlocks - dataBlock.m_rxControlBlock.m_recoveryCount + ir; + int blockIndex = m_cm256DescriptorBlocks[recoveryIndex].Index; + SDRDaemonProtectedBlock *recoveredBlock = + (SDRDaemonProtectedBlock *) m_cm256DescriptorBlocks[recoveryIndex].Block; + memcpy((void *) &(dataBlock.m_superBlocks[blockIndex].m_protectedBlock), recoveredBlock, sizeof(SDRDaemonProtectedBlock)); + if ((blockIndex == 0) && !dataBlock.m_rxControlBlock.m_metaRetrieved) { + dataBlock.m_rxControlBlock.m_metaRetrieved = true; + } + } + } + } + + // Validate block zero and retrieve its data + if (dataBlock.m_rxControlBlock.m_metaRetrieved) + { + SDRDaemonMetaDataFEC *metaData = (SDRDaemonMetaDataFEC *) &(dataBlock.m_superBlocks[0].m_protectedBlock); + boost::crc_32_type crc32; + crc32.process_bytes(metaData, 20); + + if (crc32.checksum() == metaData->m_crc32) + { + if (!(m_currentMeta == *metaData)) { + printMeta("SDRDaemonChannelSource::handleDataBlock", metaData); + } + + m_currentMeta = *metaData; + } + else + { + qWarning() << "SDRDaemonChannelSource::handleDataBlock: recovered meta: invalid CRC32"; + } + } + } //TODO: Push into R/W buffer return true; } @@ -183,3 +270,17 @@ void SDRDaemonChannelSource::handleData() } } } + +void SDRDaemonChannelSource::printMeta(const QString& header, SDRDaemonMetaDataFEC *metaData) +{ + qDebug() << header << ": " + << "|" << metaData->m_centerFrequency + << ":" << metaData->m_sampleRate + << ":" << (int) (metaData->m_sampleBytes & 0xF) + << ":" << (int) metaData->m_sampleBits + << ":" << (int) metaData->m_nbOriginalBlocks + << ":" << (int) metaData->m_nbFECBlocks + << "|" << metaData->m_tv_sec + << ":" << metaData->m_tv_usec + << "|"; +} diff --git a/sdrdaemon/channel/sdrdaemonchannelsource.h b/sdrdaemon/channel/sdrdaemonchannelsource.h index be1e73e71..433bf8250 100644 --- a/sdrdaemon/channel/sdrdaemonchannelsource.h +++ b/sdrdaemon/channel/sdrdaemonchannelsource.h @@ -29,6 +29,7 @@ #include "channel/channelsourceapi.h" #include "channel/sdrdaemonchannelsourcesettings.h" #include "channel/sdrdaemondataqueue.h" +#include "channel/sdrdaemondatablock.h" class ThreadedBasebandSampleSource; class UpChannelizer; @@ -97,8 +98,12 @@ private: QString m_dataAddress; uint16_t m_dataPort; + CM256::cm256_block m_cm256DescriptorBlocks[2*SDRDaemonNbOrginalBlocks]; //!< CM256 decoder descriptors (block addresses and block indexes) + SDRDaemonMetaDataFEC m_currentMeta; + void applySettings(const SDRDaemonChannelSourceSettings& settings, bool force = false); bool handleDataBlock(SDRDaemonDataBlock& dataBlock); + void printMeta(const QString& header, SDRDaemonMetaDataFEC *metaData); private slots: void handleData(); diff --git a/sdrdaemon/channel/sdrdaemonchannelsourcethread.cpp b/sdrdaemon/channel/sdrdaemonchannelsourcethread.cpp index f644a9523..533ace7d2 100644 --- a/sdrdaemon/channel/sdrdaemonchannelsourcethread.cpp +++ b/sdrdaemon/channel/sdrdaemonchannelsourcethread.cpp @@ -20,6 +20,8 @@ // along with this program. If not, see . // /////////////////////////////////////////////////////////////////////////////////// +#include + #include #include "channel/sdrdaemondataqueue.h" @@ -31,14 +33,14 @@ MESSAGE_CLASS_DEFINITION(SDRDaemonChannelSourceThread::MsgStartStop, Message) MESSAGE_CLASS_DEFINITION(SDRDaemonChannelSourceThread::MsgDataBind, Message) -SDRDaemonChannelSourceThread::SDRDaemonChannelSourceThread(SDRDaemonDataQueue *dataQueue, CM256 *cm256, QObject* parent) : +SDRDaemonChannelSourceThread::SDRDaemonChannelSourceThread(SDRDaemonDataQueue *dataQueue, QObject* parent) : QThread(parent), m_running(false), m_dataQueue(dataQueue), - m_cm256(cm256), m_address(QHostAddress::LocalHost), m_socket(0) { + std::fill(m_dataBlocks, m_dataBlocks+4, (SDRDaemonDataBlock *) 0); connect(&m_inputMessageQueue, SIGNAL(messageEnqueued()), this, SLOT(handleInputMessages()), Qt::QueuedConnection); } @@ -131,17 +133,69 @@ void SDRDaemonChannelSourceThread::handleInputMessages() void SDRDaemonChannelSourceThread::readPendingDatagrams() { - char data[1024]; + SDRDaemonSuperBlock superBlock; + qint64 size; + while (m_socket->hasPendingDatagrams()) { QHostAddress sender; quint16 senderPort = 0; - qint64 pendingDataSize = m_socket->pendingDatagramSize(); - m_socket->readDatagram(data, pendingDataSize, &sender, &senderPort); - qDebug("SDRDaemonChannelSourceThread::readPendingDatagrams: %lld bytes received from %s:%d", - pendingDataSize, - qPrintable(sender.toString()), - senderPort); + //qint64 pendingDataSize = m_socket->pendingDatagramSize(); + size = m_socket->readDatagram((char *) &superBlock, (long long int) sizeof(SDRDaemonSuperBlock), &sender, &senderPort); + if (size == sizeof(SDRDaemonSuperBlock)) + { + unsigned int dataBlockIndex = superBlock.m_header.m_frameIndex % m_nbDataBlocks; + + // create the first block for this index + if (m_dataBlocks[dataBlockIndex] == 0) { + m_dataBlocks[dataBlockIndex] = new SDRDaemonDataBlock(); + } + + if (m_dataBlocks[dataBlockIndex]->m_rxControlBlock.m_frameIndex < 0) + { + // initialize virgin block with the frame index + m_dataBlocks[dataBlockIndex]->m_rxControlBlock.m_frameIndex = superBlock.m_header.m_frameIndex; + } + else + { + // if the frame index is not the same for the same slot it means we are starting a new frame + uint32_t frameIndex = m_dataBlocks[dataBlockIndex]->m_rxControlBlock.m_frameIndex; + + if (superBlock.m_header.m_frameIndex != frameIndex) + { + //qDebug("SDRDaemonChannelSourceThread::readPendingDatagrams: push frame %u", frameIndex); + m_dataQueue->push(m_dataBlocks[dataBlockIndex]); + m_dataBlocks[dataBlockIndex] = new SDRDaemonDataBlock(); + m_dataBlocks[dataBlockIndex]->m_rxControlBlock.m_frameIndex = superBlock.m_header.m_frameIndex; + } + } + + m_dataBlocks[dataBlockIndex]->m_superBlocks[superBlock.m_header.m_blockIndex] = superBlock; + + if (superBlock.m_header.m_blockIndex == 0) { + m_dataBlocks[dataBlockIndex]->m_rxControlBlock.m_metaRetrieved = true; + } + + if (superBlock.m_header.m_blockIndex < SDRDaemonNbOrginalBlocks) { + m_dataBlocks[dataBlockIndex]->m_rxControlBlock.m_originalCount++; + } else { + m_dataBlocks[dataBlockIndex]->m_rxControlBlock.m_recoveryCount++; + } + + m_dataBlocks[dataBlockIndex]->m_rxControlBlock.m_blockCount++; + +// // if enough data blocks to decode push into data queue +// if (m_dataBlocks[dataBlockIndex]->m_rxControlBlock.m_blockCount == SDRDaemonNbOrginalBlocks) +// { +// //qDebug("SDRDaemonChannelSourceThread::readPendingDatagrams: push frame %u", superBlock.m_header.m_frameIndex); +// m_dataQueue->push(m_dataBlocks[dataBlockIndex]); +// m_dataBlocks[dataBlockIndex] = new SDRDaemonDataBlock(); +// } + } + else + { + qWarning("SDRDaemonChannelSourceThread::readPendingDatagrams: wrong super block size not processing"); + } } } diff --git a/sdrdaemon/channel/sdrdaemonchannelsourcethread.h b/sdrdaemon/channel/sdrdaemonchannelsourcethread.h index de4767578..37892d69b 100644 --- a/sdrdaemon/channel/sdrdaemonchannelsourcethread.h +++ b/sdrdaemon/channel/sdrdaemonchannelsourcethread.h @@ -33,7 +33,6 @@ class SDRDaemonDataQueue; class SDRDaemonDataBlock; -class CM256; class QUdpSocket; class SDRDaemonChannelSourceThread : public QThread { @@ -81,7 +80,7 @@ public: } }; - SDRDaemonChannelSourceThread(SDRDaemonDataQueue *dataQueue, CM256 *cm256, QObject* parent = 0); + SDRDaemonChannelSourceThread(SDRDaemonDataQueue *dataQueue, QObject* parent = 0); ~SDRDaemonChannelSourceThread(); void startStop(bool start); @@ -94,11 +93,13 @@ private: MessageQueue m_inputMessageQueue; SDRDaemonDataQueue *m_dataQueue; - CM256 *m_cm256; //!< CM256 library object QHostAddress m_address; QUdpSocket *m_socket; + static const uint32_t m_nbDataBlocks = 4; //!< number of data blocks in the ring buffer + SDRDaemonDataBlock *m_dataBlocks[m_nbDataBlocks]; //!< ring buffer of data blocks indexed by frame affinity + void startWork(); void stopWork(); diff --git a/sdrdaemon/channel/sdrdaemondatablock.h b/sdrdaemon/channel/sdrdaemondatablock.h index b8aec1a71..af7a635b0 100644 --- a/sdrdaemon/channel/sdrdaemondatablock.h +++ b/sdrdaemon/channel/sdrdaemondatablock.h @@ -57,7 +57,7 @@ struct SDRDaemonMetaDataFEC m_sampleBytes = 0; m_sampleBits = 0; m_nbOriginalBlocks = 0; - m_nbFECBlocks = -1; + m_nbFECBlocks = 0; m_tv_sec = 0; m_tv_usec = 0; m_crc32 = 0; @@ -125,6 +125,23 @@ struct SDRDaemonTxControlBlock } }; +struct SDRDaemonRxControlBlock +{ + int m_blockCount; //!< number of blocks received for this frame + int m_originalCount; //!< number of original blocks received + int m_recoveryCount; //!< number of recovery blocks received + bool m_metaRetrieved; //!< true if meta data (block zero) was retrieved + int m_frameIndex; //!< this frame index or -1 if unset + + SDRDaemonRxControlBlock() { + m_blockCount = 0; + m_originalCount = 0; + m_recoveryCount = 0; + m_metaRetrieved = false; + m_frameIndex = -1; + } +}; + class SDRDaemonDataBlock { public: @@ -134,7 +151,8 @@ public: ~SDRDaemonDataBlock() { delete[] m_superBlocks; } - SDRDaemonTxControlBlock m_controlBlock; + SDRDaemonTxControlBlock m_txControlBlock; + SDRDaemonRxControlBlock m_rxControlBlock; SDRDaemonSuperBlock *m_superBlocks; };