SDRdaemon: channel source mechanism with FEC recovery

This commit is contained in:
f4exb 2018-08-27 01:09:12 +02:00
parent f30dcf7753
commit 77ed548034
7 changed files with 209 additions and 30 deletions

View File

@ -70,7 +70,7 @@ SDRDaemonChannelSink::SDRDaemonChannelSink(DeviceSourceAPI *deviceAPI) :
SDRDaemonChannelSink::~SDRDaemonChannelSink()
{
m_dataBlockMutex.lock();
if (m_dataBlock && !m_dataBlock->m_controlBlock.m_complete) {
if (m_dataBlock && !m_dataBlock->m_txControlBlock.m_complete) {
delete m_dataBlock;
}
m_dataBlockMutex.unlock();
@ -171,13 +171,13 @@ void SDRDaemonChannelSink::feed(const SampleVector::const_iterator& begin, const
if (m_txBlockIndex == SDRDaemonNbOrginalBlocks - 1) // frame complete
{
m_dataBlockMutex.lock();
m_dataBlock->m_controlBlock.m_frameIndex = m_frameCount;
m_dataBlock->m_controlBlock.m_processed = false;
m_dataBlock->m_controlBlock.m_complete = true;
m_dataBlock->m_controlBlock.m_nbBlocksFEC = m_nbBlocksFEC;
m_dataBlock->m_controlBlock.m_txDelay = m_txDelay;
m_dataBlock->m_controlBlock.m_dataAddress = m_dataAddress;
m_dataBlock->m_controlBlock.m_dataPort = m_dataPort;
m_dataBlock->m_txControlBlock.m_frameIndex = m_frameCount;
m_dataBlock->m_txControlBlock.m_processed = false;
m_dataBlock->m_txControlBlock.m_complete = true;
m_dataBlock->m_txControlBlock.m_nbBlocksFEC = m_nbBlocksFEC;
m_dataBlock->m_txControlBlock.m_txDelay = m_txDelay;
m_dataBlock->m_txControlBlock.m_dataAddress = m_dataAddress;
m_dataBlock->m_txControlBlock.m_dataPort = m_dataPort;
m_dataQueue.push(m_dataBlock);
m_dataBlock = new SDRDaemonDataBlock(); // create a new one immediately

View File

@ -94,11 +94,11 @@ bool SDRDaemonChannelSinkThread::handleDataBlock(SDRDaemonDataBlock& dataBlock)
CM256::cm256_block descriptorBlocks[256]; //!< Pointers to data for CM256 encoder
SDRDaemonProtectedBlock fecBlocks[256]; //!< FEC data
uint16_t frameIndex = dataBlock.m_controlBlock.m_frameIndex;
int nbBlocksFEC = dataBlock.m_controlBlock.m_nbBlocksFEC;
int txDelay = dataBlock.m_controlBlock.m_txDelay;
m_address.setAddress(dataBlock.m_controlBlock.m_dataAddress);
uint16_t dataPort = dataBlock.m_controlBlock.m_dataPort;
uint16_t frameIndex = dataBlock.m_txControlBlock.m_frameIndex;
int nbBlocksFEC = dataBlock.m_txControlBlock.m_nbBlocksFEC;
int txDelay = dataBlock.m_txControlBlock.m_txDelay;
m_address.setAddress(dataBlock.m_txControlBlock.m_dataAddress);
uint16_t dataPort = dataBlock.m_txControlBlock.m_dataPort;
SDRDaemonSuperBlock *txBlockx = dataBlock.m_superBlocks;
if ((nbBlocksFEC == 0) || !m_cm256) // Do not FEC encode
@ -158,7 +158,7 @@ bool SDRDaemonChannelSinkThread::handleDataBlock(SDRDaemonDataBlock& dataBlock)
}
}
dataBlock.m_controlBlock.m_processed = true;
dataBlock.m_txControlBlock.m_processed = true;
return true;
}

View File

@ -20,6 +20,9 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>. //
///////////////////////////////////////////////////////////////////////////////////
#include <boost/crc.hpp>
#include <boost/cstdint.hpp>
#include <QDebug>
#include "util/simpleserializer.h"
@ -53,6 +56,7 @@ SDRDaemonChannelSource::SDRDaemonChannelSource(DeviceSinkAPI *deviceAPI) :
connect(&m_dataQueue, SIGNAL(dataBlockEnqueued()), this, SLOT(handleData()), Qt::QueuedConnection);
m_cm256p = m_cm256.isInitialized() ? &m_cm256 : 0;
m_currentMeta.init();
}
SDRDaemonChannelSource::~SDRDaemonChannelSource()
@ -79,7 +83,7 @@ void SDRDaemonChannelSource::start()
stop();
}
m_sourceThread = new SDRDaemonChannelSourceThread(&m_dataQueue, m_cm256p);
m_sourceThread = new SDRDaemonChannelSourceThread(&m_dataQueue);
m_sourceThread->startStop(true);
m_sourceThread->dataBind(m_dataAddress, m_dataPort);
m_running = true;
@ -165,8 +169,91 @@ void SDRDaemonChannelSource::applySettings(const SDRDaemonChannelSourceSettings&
m_settings = settings;
}
bool SDRDaemonChannelSource::handleDataBlock(SDRDaemonDataBlock& dataBlock __attribute__((unused)))
bool SDRDaemonChannelSource::handleDataBlock(SDRDaemonDataBlock& dataBlock)
{
if (dataBlock.m_rxControlBlock.m_blockCount < SDRDaemonNbOrginalBlocks)
{
qWarning("SDRDaemonChannelSource::handleDataBlock: incomplete data block: not processing");
}
else
{
int blockCount = 0;
for (int blockIndex = 0; blockIndex < 256; blockIndex++)
{
if ((blockIndex == 0) && (dataBlock.m_rxControlBlock.m_metaRetrieved))
{
m_cm256DescriptorBlocks[blockCount].Index = 0;
m_cm256DescriptorBlocks[blockCount].Block = (void *) &(dataBlock.m_superBlocks[0].m_protectedBlock);
blockCount++;
}
else if (dataBlock.m_superBlocks[blockIndex].m_header.m_blockIndex != 0)
{
m_cm256DescriptorBlocks[blockCount].Index = dataBlock.m_superBlocks[blockIndex].m_header.m_blockIndex;
m_cm256DescriptorBlocks[blockCount].Block = (void *) &(dataBlock.m_superBlocks[blockIndex].m_protectedBlock);
blockCount++;
}
}
//qDebug("SDRDaemonChannelSource::handleDataBlock: frame: %u blocks: %d", dataBlock.m_rxControlBlock.m_frameIndex, blockCount);
// Need to use the CM256 recovery
if (m_cm256p &&(dataBlock.m_rxControlBlock.m_originalCount < SDRDaemonNbOrginalBlocks))
{
qDebug("SDRDaemonChannelSource::handleDataBlock: %d recovery blocks", dataBlock.m_rxControlBlock.m_recoveryCount);
CM256::cm256_encoder_params paramsCM256;
paramsCM256.BlockBytes = sizeof(SDRDaemonProtectedBlock); // never changes
paramsCM256.OriginalCount = SDRDaemonNbOrginalBlocks; // never changes
if (m_currentMeta.m_tv_sec == 0) {
paramsCM256.RecoveryCount = dataBlock.m_rxControlBlock.m_recoveryCount;
} else {
paramsCM256.RecoveryCount = m_currentMeta.m_nbFECBlocks;
}
if (m_cm256.cm256_decode(paramsCM256, m_cm256DescriptorBlocks)) // CM256 decode
{
qWarning() << "SDRDaemonChannelSource::handleDataBlock: decode CM256 error:"
<< " m_originalCount: " << dataBlock.m_rxControlBlock.m_originalCount
<< " m_recoveryCount: " << dataBlock.m_rxControlBlock.m_recoveryCount;
}
else
{
for (int ir = 0; ir < dataBlock.m_rxControlBlock.m_recoveryCount; ir++) // restore missing blocks
{
int recoveryIndex = SDRDaemonNbOrginalBlocks - dataBlock.m_rxControlBlock.m_recoveryCount + ir;
int blockIndex = m_cm256DescriptorBlocks[recoveryIndex].Index;
SDRDaemonProtectedBlock *recoveredBlock =
(SDRDaemonProtectedBlock *) m_cm256DescriptorBlocks[recoveryIndex].Block;
memcpy((void *) &(dataBlock.m_superBlocks[blockIndex].m_protectedBlock), recoveredBlock, sizeof(SDRDaemonProtectedBlock));
if ((blockIndex == 0) && !dataBlock.m_rxControlBlock.m_metaRetrieved) {
dataBlock.m_rxControlBlock.m_metaRetrieved = true;
}
}
}
}
// Validate block zero and retrieve its data
if (dataBlock.m_rxControlBlock.m_metaRetrieved)
{
SDRDaemonMetaDataFEC *metaData = (SDRDaemonMetaDataFEC *) &(dataBlock.m_superBlocks[0].m_protectedBlock);
boost::crc_32_type crc32;
crc32.process_bytes(metaData, 20);
if (crc32.checksum() == metaData->m_crc32)
{
if (!(m_currentMeta == *metaData)) {
printMeta("SDRDaemonChannelSource::handleDataBlock", metaData);
}
m_currentMeta = *metaData;
}
else
{
qWarning() << "SDRDaemonChannelSource::handleDataBlock: recovered meta: invalid CRC32";
}
}
}
//TODO: Push into R/W buffer
return true;
}
@ -183,3 +270,17 @@ void SDRDaemonChannelSource::handleData()
}
}
}
void SDRDaemonChannelSource::printMeta(const QString& header, SDRDaemonMetaDataFEC *metaData)
{
qDebug() << header << ": "
<< "|" << metaData->m_centerFrequency
<< ":" << metaData->m_sampleRate
<< ":" << (int) (metaData->m_sampleBytes & 0xF)
<< ":" << (int) metaData->m_sampleBits
<< ":" << (int) metaData->m_nbOriginalBlocks
<< ":" << (int) metaData->m_nbFECBlocks
<< "|" << metaData->m_tv_sec
<< ":" << metaData->m_tv_usec
<< "|";
}

View File

@ -29,6 +29,7 @@
#include "channel/channelsourceapi.h"
#include "channel/sdrdaemonchannelsourcesettings.h"
#include "channel/sdrdaemondataqueue.h"
#include "channel/sdrdaemondatablock.h"
class ThreadedBasebandSampleSource;
class UpChannelizer;
@ -97,8 +98,12 @@ private:
QString m_dataAddress;
uint16_t m_dataPort;
CM256::cm256_block m_cm256DescriptorBlocks[2*SDRDaemonNbOrginalBlocks]; //!< CM256 decoder descriptors (block addresses and block indexes)
SDRDaemonMetaDataFEC m_currentMeta;
void applySettings(const SDRDaemonChannelSourceSettings& settings, bool force = false);
bool handleDataBlock(SDRDaemonDataBlock& dataBlock);
void printMeta(const QString& header, SDRDaemonMetaDataFEC *metaData);
private slots:
void handleData();

View File

@ -20,6 +20,8 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>. //
///////////////////////////////////////////////////////////////////////////////////
#include <algorithm>
#include <QUdpSocket>
#include "channel/sdrdaemondataqueue.h"
@ -31,14 +33,14 @@
MESSAGE_CLASS_DEFINITION(SDRDaemonChannelSourceThread::MsgStartStop, Message)
MESSAGE_CLASS_DEFINITION(SDRDaemonChannelSourceThread::MsgDataBind, Message)
SDRDaemonChannelSourceThread::SDRDaemonChannelSourceThread(SDRDaemonDataQueue *dataQueue, CM256 *cm256, QObject* parent) :
SDRDaemonChannelSourceThread::SDRDaemonChannelSourceThread(SDRDaemonDataQueue *dataQueue, QObject* parent) :
QThread(parent),
m_running(false),
m_dataQueue(dataQueue),
m_cm256(cm256),
m_address(QHostAddress::LocalHost),
m_socket(0)
{
std::fill(m_dataBlocks, m_dataBlocks+4, (SDRDaemonDataBlock *) 0);
connect(&m_inputMessageQueue, SIGNAL(messageEnqueued()), this, SLOT(handleInputMessages()), Qt::QueuedConnection);
}
@ -131,17 +133,69 @@ void SDRDaemonChannelSourceThread::handleInputMessages()
void SDRDaemonChannelSourceThread::readPendingDatagrams()
{
char data[1024];
SDRDaemonSuperBlock superBlock;
qint64 size;
while (m_socket->hasPendingDatagrams())
{
QHostAddress sender;
quint16 senderPort = 0;
qint64 pendingDataSize = m_socket->pendingDatagramSize();
m_socket->readDatagram(data, pendingDataSize, &sender, &senderPort);
qDebug("SDRDaemonChannelSourceThread::readPendingDatagrams: %lld bytes received from %s:%d",
pendingDataSize,
qPrintable(sender.toString()),
senderPort);
//qint64 pendingDataSize = m_socket->pendingDatagramSize();
size = m_socket->readDatagram((char *) &superBlock, (long long int) sizeof(SDRDaemonSuperBlock), &sender, &senderPort);
if (size == sizeof(SDRDaemonSuperBlock))
{
unsigned int dataBlockIndex = superBlock.m_header.m_frameIndex % m_nbDataBlocks;
// create the first block for this index
if (m_dataBlocks[dataBlockIndex] == 0) {
m_dataBlocks[dataBlockIndex] = new SDRDaemonDataBlock();
}
if (m_dataBlocks[dataBlockIndex]->m_rxControlBlock.m_frameIndex < 0)
{
// initialize virgin block with the frame index
m_dataBlocks[dataBlockIndex]->m_rxControlBlock.m_frameIndex = superBlock.m_header.m_frameIndex;
}
else
{
// if the frame index is not the same for the same slot it means we are starting a new frame
uint32_t frameIndex = m_dataBlocks[dataBlockIndex]->m_rxControlBlock.m_frameIndex;
if (superBlock.m_header.m_frameIndex != frameIndex)
{
//qDebug("SDRDaemonChannelSourceThread::readPendingDatagrams: push frame %u", frameIndex);
m_dataQueue->push(m_dataBlocks[dataBlockIndex]);
m_dataBlocks[dataBlockIndex] = new SDRDaemonDataBlock();
m_dataBlocks[dataBlockIndex]->m_rxControlBlock.m_frameIndex = superBlock.m_header.m_frameIndex;
}
}
m_dataBlocks[dataBlockIndex]->m_superBlocks[superBlock.m_header.m_blockIndex] = superBlock;
if (superBlock.m_header.m_blockIndex == 0) {
m_dataBlocks[dataBlockIndex]->m_rxControlBlock.m_metaRetrieved = true;
}
if (superBlock.m_header.m_blockIndex < SDRDaemonNbOrginalBlocks) {
m_dataBlocks[dataBlockIndex]->m_rxControlBlock.m_originalCount++;
} else {
m_dataBlocks[dataBlockIndex]->m_rxControlBlock.m_recoveryCount++;
}
m_dataBlocks[dataBlockIndex]->m_rxControlBlock.m_blockCount++;
// // if enough data blocks to decode push into data queue
// if (m_dataBlocks[dataBlockIndex]->m_rxControlBlock.m_blockCount == SDRDaemonNbOrginalBlocks)
// {
// //qDebug("SDRDaemonChannelSourceThread::readPendingDatagrams: push frame %u", superBlock.m_header.m_frameIndex);
// m_dataQueue->push(m_dataBlocks[dataBlockIndex]);
// m_dataBlocks[dataBlockIndex] = new SDRDaemonDataBlock();
// }
}
else
{
qWarning("SDRDaemonChannelSourceThread::readPendingDatagrams: wrong super block size not processing");
}
}
}

View File

@ -33,7 +33,6 @@
class SDRDaemonDataQueue;
class SDRDaemonDataBlock;
class CM256;
class QUdpSocket;
class SDRDaemonChannelSourceThread : public QThread {
@ -81,7 +80,7 @@ public:
}
};
SDRDaemonChannelSourceThread(SDRDaemonDataQueue *dataQueue, CM256 *cm256, QObject* parent = 0);
SDRDaemonChannelSourceThread(SDRDaemonDataQueue *dataQueue, QObject* parent = 0);
~SDRDaemonChannelSourceThread();
void startStop(bool start);
@ -94,11 +93,13 @@ private:
MessageQueue m_inputMessageQueue;
SDRDaemonDataQueue *m_dataQueue;
CM256 *m_cm256; //!< CM256 library object
QHostAddress m_address;
QUdpSocket *m_socket;
static const uint32_t m_nbDataBlocks = 4; //!< number of data blocks in the ring buffer
SDRDaemonDataBlock *m_dataBlocks[m_nbDataBlocks]; //!< ring buffer of data blocks indexed by frame affinity
void startWork();
void stopWork();

View File

@ -57,7 +57,7 @@ struct SDRDaemonMetaDataFEC
m_sampleBytes = 0;
m_sampleBits = 0;
m_nbOriginalBlocks = 0;
m_nbFECBlocks = -1;
m_nbFECBlocks = 0;
m_tv_sec = 0;
m_tv_usec = 0;
m_crc32 = 0;
@ -125,6 +125,23 @@ struct SDRDaemonTxControlBlock
}
};
struct SDRDaemonRxControlBlock
{
int m_blockCount; //!< number of blocks received for this frame
int m_originalCount; //!< number of original blocks received
int m_recoveryCount; //!< number of recovery blocks received
bool m_metaRetrieved; //!< true if meta data (block zero) was retrieved
int m_frameIndex; //!< this frame index or -1 if unset
SDRDaemonRxControlBlock() {
m_blockCount = 0;
m_originalCount = 0;
m_recoveryCount = 0;
m_metaRetrieved = false;
m_frameIndex = -1;
}
};
class SDRDaemonDataBlock
{
public:
@ -134,7 +151,8 @@ public:
~SDRDaemonDataBlock() {
delete[] m_superBlocks;
}
SDRDaemonTxControlBlock m_controlBlock;
SDRDaemonTxControlBlock m_txControlBlock;
SDRDaemonRxControlBlock m_rxControlBlock;
SDRDaemonSuperBlock *m_superBlocks;
};