SDRdaemon: data read queue

This commit is contained in:
f4exb 2018-08-28 06:33:15 +02:00
parent 82cba84a50
commit 8196a70753
7 changed files with 198 additions and 132 deletions

View File

@ -8,11 +8,11 @@ set(sdrdaemon_SOURCES
channel/sdrdaemonchannelsink.cpp
channel/sdrdaemonchannelsource.cpp
channel/sdrdaemondataqueue.cpp
channel/sdrdaemondatareadqueue.cpp
channel/sdrdaemonchannelsinkthread.cpp
channel/sdrdaemonchannelsinksettings.cpp
channel/sdrdaemonchannelsourcesettings.cpp
channel/sdrdaemonchannelsourcethread.cpp
channel/sdrdaemonchannelsourcebuffer.cpp
webapi/webapiadapterdaemon.cpp
webapi/webapirequestmapper.cpp
webapi/webapiserver.cpp
@ -26,12 +26,12 @@ set(sdrdaemon_HEADERS
channel/sdrdaemonchannelsink.h
channel/sdrdaemonchannelsource.h
channel/sdrdaemondataqueue.h
channel/sdrdaemondatareadqueue.h
channel/sdrdaemondatablock.h
channel/sdrdaemonchannelsinkthread.h
channel/sdrdaemonchannelsinksettings.h
channel/sdrdaemonchannelsourcesettings.h
channel/sdrdaemonchannelsourcethread.h
channel/sdrdaemonchannelsourcebuffer.h
webapi/webapiadapterdaemon.h
webapi/webapirequestmapper.h
webapi/webapiserver.h

View File

@ -70,9 +70,7 @@ SDRDaemonChannelSource::~SDRDaemonChannelSource()
void SDRDaemonChannelSource::pull(Sample& sample)
{
sample.m_real = 0.0f;
sample.m_imag = 0.0f;
m_dataReadQueue.readSample(sample);
m_samplesCount++;
}
@ -182,9 +180,9 @@ void SDRDaemonChannelSource::applySettings(const SDRDaemonChannelSourceSettings&
m_settings = settings;
}
bool SDRDaemonChannelSource::handleDataBlock(SDRDaemonDataBlock& dataBlock)
void SDRDaemonChannelSource::handleDataBlock(SDRDaemonDataBlock* dataBlock)
{
if (dataBlock.m_rxControlBlock.m_blockCount < SDRDaemonNbOrginalBlocks)
if (dataBlock->m_rxControlBlock.m_blockCount < SDRDaemonNbOrginalBlocks)
{
qWarning("SDRDaemonChannelSource::handleDataBlock: incomplete data block: not processing");
}
@ -194,16 +192,16 @@ bool SDRDaemonChannelSource::handleDataBlock(SDRDaemonDataBlock& dataBlock)
for (int blockIndex = 0; blockIndex < 256; blockIndex++)
{
if ((blockIndex == 0) && (dataBlock.m_rxControlBlock.m_metaRetrieved))
if ((blockIndex == 0) && (dataBlock->m_rxControlBlock.m_metaRetrieved))
{
m_cm256DescriptorBlocks[blockCount].Index = 0;
m_cm256DescriptorBlocks[blockCount].Block = (void *) &(dataBlock.m_superBlocks[0].m_protectedBlock);
m_cm256DescriptorBlocks[blockCount].Block = (void *) &(dataBlock->m_superBlocks[0].m_protectedBlock);
blockCount++;
}
else if (dataBlock.m_superBlocks[blockIndex].m_header.m_blockIndex != 0)
else if (dataBlock->m_superBlocks[blockIndex].m_header.m_blockIndex != 0)
{
m_cm256DescriptorBlocks[blockCount].Index = dataBlock.m_superBlocks[blockIndex].m_header.m_blockIndex;
m_cm256DescriptorBlocks[blockCount].Block = (void *) &(dataBlock.m_superBlocks[blockIndex].m_protectedBlock);
m_cm256DescriptorBlocks[blockCount].Index = dataBlock->m_superBlocks[blockIndex].m_header.m_blockIndex;
m_cm256DescriptorBlocks[blockCount].Block = (void *) &(dataBlock->m_superBlocks[blockIndex].m_protectedBlock);
blockCount++;
}
}
@ -211,15 +209,15 @@ bool SDRDaemonChannelSource::handleDataBlock(SDRDaemonDataBlock& dataBlock)
//qDebug("SDRDaemonChannelSource::handleDataBlock: frame: %u blocks: %d", dataBlock.m_rxControlBlock.m_frameIndex, blockCount);
// Need to use the CM256 recovery
if (m_cm256p &&(dataBlock.m_rxControlBlock.m_originalCount < SDRDaemonNbOrginalBlocks))
if (m_cm256p &&(dataBlock->m_rxControlBlock.m_originalCount < SDRDaemonNbOrginalBlocks))
{
qDebug("SDRDaemonChannelSource::handleDataBlock: %d recovery blocks", dataBlock.m_rxControlBlock.m_recoveryCount);
qDebug("SDRDaemonChannelSource::handleDataBlock: %d recovery blocks", dataBlock->m_rxControlBlock.m_recoveryCount);
CM256::cm256_encoder_params paramsCM256;
paramsCM256.BlockBytes = sizeof(SDRDaemonProtectedBlock); // never changes
paramsCM256.OriginalCount = SDRDaemonNbOrginalBlocks; // never changes
if (m_currentMeta.m_tv_sec == 0) {
paramsCM256.RecoveryCount = dataBlock.m_rxControlBlock.m_recoveryCount;
paramsCM256.RecoveryCount = dataBlock->m_rxControlBlock.m_recoveryCount;
} else {
paramsCM256.RecoveryCount = m_currentMeta.m_nbFECBlocks;
}
@ -227,29 +225,29 @@ bool SDRDaemonChannelSource::handleDataBlock(SDRDaemonDataBlock& dataBlock)
if (m_cm256.cm256_decode(paramsCM256, m_cm256DescriptorBlocks)) // CM256 decode
{
qWarning() << "SDRDaemonChannelSource::handleDataBlock: decode CM256 error:"
<< " m_originalCount: " << dataBlock.m_rxControlBlock.m_originalCount
<< " m_recoveryCount: " << dataBlock.m_rxControlBlock.m_recoveryCount;
<< " m_originalCount: " << dataBlock->m_rxControlBlock.m_originalCount
<< " m_recoveryCount: " << dataBlock->m_rxControlBlock.m_recoveryCount;
}
else
{
for (int ir = 0; ir < dataBlock.m_rxControlBlock.m_recoveryCount; ir++) // restore missing blocks
for (int ir = 0; ir < dataBlock->m_rxControlBlock.m_recoveryCount; ir++) // restore missing blocks
{
int recoveryIndex = SDRDaemonNbOrginalBlocks - dataBlock.m_rxControlBlock.m_recoveryCount + ir;
int recoveryIndex = SDRDaemonNbOrginalBlocks - dataBlock->m_rxControlBlock.m_recoveryCount + ir;
int blockIndex = m_cm256DescriptorBlocks[recoveryIndex].Index;
SDRDaemonProtectedBlock *recoveredBlock =
(SDRDaemonProtectedBlock *) m_cm256DescriptorBlocks[recoveryIndex].Block;
memcpy((void *) &(dataBlock.m_superBlocks[blockIndex].m_protectedBlock), recoveredBlock, sizeof(SDRDaemonProtectedBlock));
if ((blockIndex == 0) && !dataBlock.m_rxControlBlock.m_metaRetrieved) {
dataBlock.m_rxControlBlock.m_metaRetrieved = true;
memcpy((void *) &(dataBlock->m_superBlocks[blockIndex].m_protectedBlock), recoveredBlock, sizeof(SDRDaemonProtectedBlock));
if ((blockIndex == 0) && !dataBlock->m_rxControlBlock.m_metaRetrieved) {
dataBlock->m_rxControlBlock.m_metaRetrieved = true;
}
}
}
}
// Validate block zero and retrieve its data
if (dataBlock.m_rxControlBlock.m_metaRetrieved)
if (dataBlock->m_rxControlBlock.m_metaRetrieved)
{
SDRDaemonMetaDataFEC *metaData = (SDRDaemonMetaDataFEC *) &(dataBlock.m_superBlocks[0].m_protectedBlock);
SDRDaemonMetaDataFEC *metaData = (SDRDaemonMetaDataFEC *) &(dataBlock->m_superBlocks[0].m_protectedBlock);
boost::crc_32_type crc32;
crc32.process_bytes(metaData, 20);
@ -275,21 +273,17 @@ bool SDRDaemonChannelSource::handleDataBlock(SDRDaemonDataBlock& dataBlock)
qWarning() << "SDRDaemonChannelSource::handleDataBlock: recovered meta: invalid CRC32";
}
}
m_dataReadQueue.push(dataBlock); // Push into R/W buffer
}
//TODO: Push into R/W buffer
return true;
}
void SDRDaemonChannelSource::handleData()
{
SDRDaemonDataBlock* dataBlock;
while (m_running && ((dataBlock = m_dataQueue.pop()) != 0))
{
if (handleDataBlock(*dataBlock))
{
delete dataBlock;
}
while (m_running && ((dataBlock = m_dataQueue.pop()) != 0)) {
handleDataBlock(dataBlock);
}
}

View File

@ -30,6 +30,7 @@
#include "channel/sdrdaemonchannelsourcesettings.h"
#include "channel/sdrdaemondataqueue.h"
#include "channel/sdrdaemondatablock.h"
#include "channel/sdrdaemondatareadqueue.h"
class ThreadedBasebandSampleSource;
class UpChannelizer;
@ -101,8 +102,10 @@ private:
CM256::cm256_block m_cm256DescriptorBlocks[2*SDRDaemonNbOrginalBlocks]; //!< CM256 decoder descriptors (block addresses and block indexes)
SDRDaemonMetaDataFEC m_currentMeta;
SDRDaemonDataReadQueue m_dataReadQueue;
void applySettings(const SDRDaemonChannelSourceSettings& settings, bool force = false);
bool handleDataBlock(SDRDaemonDataBlock& dataBlock);
void handleDataBlock(SDRDaemonDataBlock *dataBlock);
void printMeta(const QString& header, SDRDaemonMetaDataFEC *metaData);
private slots:

View File

@ -1,85 +0,0 @@
///////////////////////////////////////////////////////////////////////////////////
// Copyright (C) 2018 Edouard Griffiths, F4EXB. //
// //
// SDRdaemon source channel (Tx). Samples buffer //
// //
// SDRdaemon is a detached SDR front end that handles the interface with a //
// physical device and sends or receives the I/Q samples stream to or from a //
// SDRangel instance via UDP. It is controlled via a Web REST API. //
// //
// This program is free software; you can redistribute it and/or modify //
// it under the terms of the GNU General Public License as published by //
// the Free Software Foundation as version 3 of the License, or //
// //
// This program is distributed in the hope that it will be useful, //
// but WITHOUT ANY WARRANTY; without even the implied warranty of //
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the //
// GNU General Public License V3 for more details. //
// //
// You should have received a copy of the GNU General Public License //
// along with this program. If not, see <http://www.gnu.org/licenses/>. //
///////////////////////////////////////////////////////////////////////////////////
#include "channel/sdrdaemonchannelsourcebuffer.h"
SDRDaemonChannelSourceBuffer::SDRDaemonChannelSourceBuffer(uint32_t nbSamples)
{
m_buffer = new Sample[nbSamples];
m_wp = nbSamples/2;
m_rp = 0;
m_size = nbSamples;
}
SDRDaemonChannelSourceBuffer::~SDRDaemonChannelSourceBuffer()
{
delete[] m_buffer;
}
void SDRDaemonChannelSourceBuffer::resize(uint32_t nbSamples)
{
if (nbSamples > m_size)
{
delete[] m_buffer;
m_buffer = new Sample[nbSamples];
}
m_wp = nbSamples/2;
m_rp = 0;
m_size = nbSamples;
}
void SDRDaemonChannelSourceBuffer::write(Sample *begin, uint32_t nbSamples)
{
if (m_wp + nbSamples < m_size)
{
std::copy(begin, begin+nbSamples, &m_buffer[m_wp]);
m_wp += nbSamples;
}
else // wrap
{
int first = m_size - m_wp;
std::copy(begin, begin+first, &m_buffer[m_wp]);
int second = nbSamples - first;
std::copy(begin+first, begin+nbSamples, m_buffer);
m_wp = second;
}
}
void SDRDaemonChannelSourceBuffer::readOne(Sample& sample)
{
sample = m_buffer[m_rp];
m_rp++;
if (m_rp == m_size) { // wrap
m_rp = 0;
}
}
int SDRDaemonChannelSourceBuffer::getRWBalancePercent()
{
if (m_wp > m_rp) {
return (((m_wp - m_rp) - (m_size/2))*100)/m_size;
} else {
return (((m_size/2) - (m_rp - m_wp))*100)/m_size;
}
}

View File

@ -26,6 +26,7 @@
#include <stdint.h>
#include <string.h>
#include <algorithm>
#include <QString>
#include "dsp/dsptypes.h"
#define UDPSINKFEC_UDPSIZE 512

View File

@ -0,0 +1,142 @@
///////////////////////////////////////////////////////////////////////////////////
// Copyright (C) 2018 Edouard Griffiths, F4EXB. //
// //
// SDRdaemon sink channel (Rx) data blocks to read queue //
// //
// SDRdaemon is a detached SDR front end that handles the interface with a //
// physical device and sends or receives the I/Q samples stream to or from a //
// SDRangel instance via UDP. It is controlled via a Web REST API. //
// //
// This program is free software; you can redistribute it and/or modify //
// it under the terms of the GNU General Public License as published by //
// the Free Software Foundation as version 3 of the License, or //
// //
// This program is distributed in the hope that it will be useful, //
// but WITHOUT ANY WARRANTY; without even the implied warranty of //
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the //
// GNU General Public License V3 for more details. //
// //
// You should have received a copy of the GNU General Public License //
// along with this program. If not, see <http://www.gnu.org/licenses/>. //
///////////////////////////////////////////////////////////////////////////////////
#include "channel/sdrdaemondatablock.h"
#include "channel/sdrdaemondatareadqueue.h"
const uint32_t SDRDaemonDataReadQueue::MaxSize = 20;
SDRDaemonDataReadQueue::SDRDaemonDataReadQueue() :
m_dataBlock(0),
m_blockIndex(1),
m_sampleIndex(0)
{}
SDRDaemonDataReadQueue::~SDRDaemonDataReadQueue()
{
SDRDaemonDataBlock* data;
while ((data = pop()) != 0)
{
qDebug("SDRDaemonDataReadQueue::~SDRDaemonDataReadQueue: data block was still in queue");
delete data;
}
}
void SDRDaemonDataReadQueue::push(SDRDaemonDataBlock* dataBlock)
{
if (size() > MaxSize)
{
qWarning("SDRDaemonDataReadQueue::push: queue is full");
SDRDaemonDataBlock *data = m_dataReadQueue.takeLast();
delete data;
}
m_dataReadQueue.append(dataBlock);
}
SDRDaemonDataBlock* SDRDaemonDataReadQueue::pop()
{
if (m_dataReadQueue.isEmpty())
{
return 0;
}
else
{
m_blockIndex = 1;
m_sampleIndex = 0;
return m_dataReadQueue.takeFirst();
}
}
uint32_t SDRDaemonDataReadQueue::size()
{
return m_dataReadQueue.size();
}
void SDRDaemonDataReadQueue::readSample(Sample& s)
{
// initial state
if (m_dataBlock == 0)
{
if (size() >= MaxSize/2)
{
qDebug("SDRDaemonDataReadQueue::readSample: initial pop new block: queue size: %u", size());
m_blockIndex = 1;
m_dataBlock = m_dataReadQueue.takeFirst();
s = m_dataBlock->m_superBlocks[m_blockIndex].m_protectedBlock.m_samples[m_sampleIndex];
m_sampleIndex++;
}
else
{
s = Sample{0, 0};
}
return;
}
if (m_sampleIndex < SDRDaemonSamplesPerBlock)
{
s = m_dataBlock->m_superBlocks[m_blockIndex].m_protectedBlock.m_samples[m_sampleIndex];
m_sampleIndex++;
}
else
{
m_sampleIndex = 0;
m_blockIndex++;
if (m_blockIndex < SDRDaemonNbOrginalBlocks)
{
s = m_dataBlock->m_superBlocks[m_blockIndex].m_protectedBlock.m_samples[m_sampleIndex];
m_sampleIndex++;
}
else
{
if (m_dataBlock)
{
delete m_dataBlock;
m_dataBlock = 0;
if (size() == 0) {
qWarning("SDRDaemonDataReadQueue::readSample: try to pop new block but queue is empty");
}
}
if (size() > 0)
{
qDebug("SDRDaemonDataReadQueue::readSample: pop new block: queue size: %u", size());
m_blockIndex = 1;
m_dataBlock = m_dataReadQueue.takeFirst();
s = m_dataBlock->m_superBlocks[m_blockIndex].m_protectedBlock.m_samples[m_sampleIndex];
m_sampleIndex++;
}
else
{
s = Sample{0, 0};
}
}
}
}

View File

@ -1,7 +1,7 @@
///////////////////////////////////////////////////////////////////////////////////
// Copyright (C) 2018 Edouard Griffiths, F4EXB. //
// //
// SDRdaemon source channel (Tx). Samples buffer //
// SDRdaemon sink channel (Rx) data blocks to read queue //
// //
// SDRdaemon is a detached SDR front end that handles the interface with a //
// physical device and sends or receives the I/Q samples stream to or from a //
@ -20,23 +20,34 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>. //
///////////////////////////////////////////////////////////////////////////////////
#include <stdint.h>
#ifndef SDRDAEMON_CHANNEL_SDRDAEMONDATAREADQUEUE_H_
#define SDRDAEMON_CHANNEL_SDRDAEMONDATAREADQUEUE_H_
#include "dsp/dsptypes.h"
#include <QQueue>
class SDRDaemonChannelSourceBuffer
class SDRDaemonDataBlock;
class Sample;
class SDRDaemonDataReadQueue
{
public:
SDRDaemonChannelSourceBuffer(uint32_t nbSamples);
~SDRDaemonChannelSourceBuffer();
void resize(uint32_t nbSamples);
void write(Sample *begin, uint32_t nbSamples);
void readOne(Sample& sample);
int getRWBalancePercent(); //!< positive write leads, negative read leads, balance = 0, percentage of buffer size
SDRDaemonDataReadQueue();
~SDRDaemonDataReadQueue();
void push(SDRDaemonDataBlock* dataBlock); //!< push block on the queue
SDRDaemonDataBlock* pop(); //!< Pop block from the queue
void readSample(Sample& s); //!< Read sample from queue
uint32_t size(); //!< Returns queue size
static const uint32_t MaxSize;
private:
Sample *m_buffer;
uint32_t m_size;
uint32_t m_rp;
uint32_t m_wp;
QQueue<SDRDaemonDataBlock*> m_dataReadQueue;
SDRDaemonDataBlock *m_dataBlock;
uint32_t m_blockIndex;
uint32_t m_sampleIndex;
};
#endif /* SDRDAEMON_CHANNEL_SDRDAEMONDATAREADQUEUE_H_ */