1
0
mirror of https://github.com/f4exb/sdrangel.git synced 2025-02-03 09:44:01 -05:00

SDRdaemonSink: added UDPSinkFEC class

This commit is contained in:
f4exb 2017-05-21 04:19:12 +02:00
parent 7f539f0314
commit 8b703a1302
8 changed files with 663 additions and 109 deletions

View File

@ -6,6 +6,7 @@ set(sdrdaemonsink_SOURCES
# sdrdaemonsinkplugin.cpp
sdrdaemonsinksettings.cpp
# sdrdaemonsinkthread.cpp
udpsinkfec.cpp
)
set(sdrdaemonsink_HEADERS
@ -14,6 +15,7 @@ set(sdrdaemonsink_HEADERS
# sdrdaemonsinkplugin.h
sdrdaemonsinksettings.h
# sdrdaemonsinkthread.h
udpsinkfec.h
)
set(sdrdaemonsink_FORMS
@ -23,6 +25,8 @@ set(sdrdaemonsink_FORMS
include_directories(
.
${CMAKE_CURRENT_BINARY_DIR}
${CMAKE_SOURCE_DIR}/devices
${CM256CC_INCLUDE_DIR}
)
add_definitions(${QT_DEFINITIONS})
@ -40,6 +44,7 @@ add_library(outputsdrdaemonsink SHARED
target_link_libraries(outputsdrdaemonsink
${QT_LIBRARIES}
sdrbase
${CM256CC_LIBRARIES}
)
qt5_use_modules(outputsdrdaemonsink Core Widgets)

View File

@ -65,7 +65,7 @@ FileSinkGui::FileSinkGui(DeviceSinkAPI *deviceAPI, QWidget* parent) :
displaySettings();
m_deviceSampleSink = new FileSinkOutput(m_deviceAPI, m_deviceAPI->getMainWindow()->getMasterTimer());
m_deviceSampleSink = new SDRdaemonSinkOutput(m_deviceAPI, m_deviceAPI->getMainWindow()->getMasterTimer());
connect(m_deviceSampleSink->getOutputMessageQueueToGUI(), SIGNAL(messageEnqueued()), this, SLOT(handleSinkMessages()));
m_deviceAPI->setSink(m_deviceSampleSink);
@ -130,15 +130,15 @@ bool FileSinkGui::deserialize(const QByteArray& data)
bool FileSinkGui::handleMessage(const Message& message)
{
if (FileSinkOutput::MsgReportFileSinkGeneration::match(message))
if (SDRdaemonSinkOutput::MsgReportSDRdaemonSinkGeneration::match(message))
{
m_generation = ((FileSinkOutput::MsgReportFileSinkGeneration&)message).getAcquisition();
m_generation = ((SDRdaemonSinkOutput::MsgReportSDRdaemonSinkGeneration&)message).getAcquisition();
updateWithGeneration();
return true;
}
else if (FileSinkOutput::MsgReportFileSinkStreamTiming::match(message))
else if (SDRdaemonSinkOutput::MsgReportSDRdaemonSinkStreamTiming::match(message))
{
m_samplesCount = ((FileSinkOutput::MsgReportFileSinkStreamTiming&)message).getSamplesCount();
m_samplesCount = ((SDRdaemonSinkOutput::MsgReportSDRdaemonSinkStreamTiming&)message).getSamplesCount();
updateWithStreamTime();
return true;
}
@ -207,7 +207,7 @@ void FileSinkGui::sendSettings()
void FileSinkGui::updateHardware()
{
qDebug() << "FileSinkGui::updateHardware";
FileSinkOutput::MsgConfigureFileSink* message = FileSinkOutput::MsgConfigureFileSink::create(m_settings);
SDRdaemonSinkOutput::MsgConfigureSDRdaemonSink* message = SDRdaemonSinkOutput::MsgConfigureSDRdaemonSink::create(m_settings);
m_deviceSampleSink->getInputMessageQueue()->push(message);
m_updateTimer.stop();
}
@ -301,7 +301,7 @@ void FileSinkGui::on_showFileDialog_clicked(bool checked)
void FileSinkGui::configureFileName()
{
qDebug() << "FileSinkGui::configureFileName: " << m_fileName.toStdString().c_str();
FileSinkOutput::MsgConfigureFileSinkName* message = FileSinkOutput::MsgConfigureFileSinkName::create(m_fileName);
SDRdaemonSinkOutput::MsgConfigureSDRdaemonSinkName* message = SDRdaemonSinkOutput::MsgConfigureSDRdaemonSinkName::create(m_fileName);
m_deviceSampleSink->getInputMessageQueue()->push(message);
}
@ -331,7 +331,7 @@ void FileSinkGui::tick()
{
if ((++m_tickCount & 0xf) == 0)
{
FileSinkOutput::MsgConfigureFileSinkStreamTiming* message = FileSinkOutput::MsgConfigureFileSinkStreamTiming::create();
SDRdaemonSinkOutput::MsgConfigureSDRdaemonSinkStreamTiming* message = SDRdaemonSinkOutput::MsgConfigureSDRdaemonSinkStreamTiming::create();
m_deviceSampleSink->getInputMessageQueue()->push(message);
}
}

View File

@ -29,7 +29,7 @@
</font>
</property>
<property name="windowTitle">
<string>FileSource</string>
<string>SDRdaemon Sink</string>
</property>
<layout class="QVBoxLayout" name="verticalLayout">
<property name="spacing">

View File

@ -29,14 +29,14 @@
#include "sdrdaemonsinkoutput.h"
#include "sdrdaemonsinkthread.h"
MESSAGE_CLASS_DEFINITION(FileSinkOutput::MsgConfigureFileSink, Message)
MESSAGE_CLASS_DEFINITION(FileSinkOutput::MsgConfigureFileSinkName, Message)
MESSAGE_CLASS_DEFINITION(FileSinkOutput::MsgConfigureFileSinkWork, Message)
MESSAGE_CLASS_DEFINITION(FileSinkOutput::MsgConfigureFileSinkStreamTiming, Message)
MESSAGE_CLASS_DEFINITION(FileSinkOutput::MsgReportFileSinkGeneration, Message)
MESSAGE_CLASS_DEFINITION(FileSinkOutput::MsgReportFileSinkStreamTiming, Message)
MESSAGE_CLASS_DEFINITION(SDRdaemonSinkOutput::MsgConfigureSDRdaemonSink, Message)
MESSAGE_CLASS_DEFINITION(SDRdaemonSinkOutput::MsgConfigureSDRdaemonSinkName, Message)
MESSAGE_CLASS_DEFINITION(SDRdaemonSinkOutput::MsgConfigureSDRdaemonSinkWork, Message)
MESSAGE_CLASS_DEFINITION(SDRdaemonSinkOutput::MsgConfigureSDRdaemonSinkStreamTiming, Message)
MESSAGE_CLASS_DEFINITION(SDRdaemonSinkOutput::MsgReportSDRdaemonSinkGeneration, Message)
MESSAGE_CLASS_DEFINITION(SDRdaemonSinkOutput::MsgReportSDRdaemonSinkStreamTiming, Message)
FileSinkOutput::FileSinkOutput(DeviceSinkAPI *deviceAPI, const QTimer& masterTimer) :
SDRdaemonSinkOutput::SDRdaemonSinkOutput(DeviceSinkAPI *deviceAPI, const QTimer& masterTimer) :
m_deviceAPI(deviceAPI),
m_settings(),
m_fileSinkThread(0),
@ -47,12 +47,12 @@ FileSinkOutput::FileSinkOutput(DeviceSinkAPI *deviceAPI, const QTimer& masterTim
{
}
FileSinkOutput::~FileSinkOutput()
SDRdaemonSinkOutput::~SDRdaemonSinkOutput()
{
stop();
}
void FileSinkOutput::openFileStream()
void SDRdaemonSinkOutput::openFileStream()
{
if (m_ofstream.is_open()) {
m_ofstream.close();
@ -70,7 +70,7 @@ void FileSinkOutput::openFileStream()
qDebug() << "FileSinkOutput::openFileStream: " << m_fileName.toStdString().c_str();
}
bool FileSinkOutput::start()
bool SDRdaemonSinkOutput::start()
{
QMutexLocker mutexLocker(&m_mutex);
qDebug() << "FileSinkOutput::start";
@ -93,13 +93,13 @@ bool FileSinkOutput::start()
//applySettings(m_generalSettings, m_settings, true);
qDebug("FileSinkOutput::start: started");
MsgReportFileSinkGeneration *report = MsgReportFileSinkGeneration::create(true); // acquisition on
MsgReportSDRdaemonSinkGeneration *report = MsgReportSDRdaemonSinkGeneration::create(true); // acquisition on
getOutputMessageQueueToGUI()->push(report);
return true;
}
void FileSinkOutput::stop()
void SDRdaemonSinkOutput::stop()
{
qDebug() << "FileSourceInput::stop";
QMutexLocker mutexLocker(&m_mutex);
@ -115,49 +115,49 @@ void FileSinkOutput::stop()
m_ofstream.close();
}
MsgReportFileSinkGeneration *report = MsgReportFileSinkGeneration::create(false); // acquisition off
MsgReportSDRdaemonSinkGeneration *report = MsgReportSDRdaemonSinkGeneration::create(false); // acquisition off
getOutputMessageQueueToGUI()->push(report);
}
const QString& FileSinkOutput::getDeviceDescription() const
const QString& SDRdaemonSinkOutput::getDeviceDescription() const
{
return m_deviceDescription;
}
int FileSinkOutput::getSampleRate() const
int SDRdaemonSinkOutput::getSampleRate() const
{
return m_settings.m_sampleRate;
}
quint64 FileSinkOutput::getCenterFrequency() const
quint64 SDRdaemonSinkOutput::getCenterFrequency() const
{
return m_settings.m_centerFrequency;
}
std::time_t FileSinkOutput::getStartingTimeStamp() const
std::time_t SDRdaemonSinkOutput::getStartingTimeStamp() const
{
return m_startingTimeStamp;
}
bool FileSinkOutput::handleMessage(const Message& message)
bool SDRdaemonSinkOutput::handleMessage(const Message& message)
{
if (MsgConfigureFileSinkName::match(message))
if (MsgConfigureSDRdaemonSinkName::match(message))
{
MsgConfigureFileSinkName& conf = (MsgConfigureFileSinkName&) message;
MsgConfigureSDRdaemonSinkName& conf = (MsgConfigureSDRdaemonSinkName&) message;
m_fileName = conf.getFileName();
openFileStream();
return true;
}
else if (MsgConfigureFileSink::match(message))
else if (MsgConfigureSDRdaemonSink::match(message))
{
qDebug() << "FileSinkOutput::handleMessage: MsgConfigureFileSink";
MsgConfigureFileSink& conf = (MsgConfigureFileSink&) message;
MsgConfigureSDRdaemonSink& conf = (MsgConfigureSDRdaemonSink&) message;
applySettings(conf.getSettings(), false);
return true;
}
else if (MsgConfigureFileSinkWork::match(message))
else if (MsgConfigureSDRdaemonSinkWork::match(message))
{
MsgConfigureFileSinkWork& conf = (MsgConfigureFileSinkWork&) message;
MsgConfigureSDRdaemonSinkWork& conf = (MsgConfigureSDRdaemonSinkWork&) message;
bool working = conf.isWorking();
if (m_fileSinkThread != 0)
@ -174,13 +174,13 @@ bool FileSinkOutput::handleMessage(const Message& message)
return true;
}
else if (MsgConfigureFileSinkStreamTiming::match(message))
else if (MsgConfigureSDRdaemonSinkStreamTiming::match(message))
{
MsgReportFileSinkStreamTiming *report;
MsgReportSDRdaemonSinkStreamTiming *report;
if (m_fileSinkThread != 0)
{
report = MsgReportFileSinkStreamTiming::create(m_fileSinkThread->getSamplesCount());
report = MsgReportSDRdaemonSinkStreamTiming::create(m_fileSinkThread->getSamplesCount());
getOutputMessageQueueToGUI()->push(report);
}
@ -192,7 +192,7 @@ bool FileSinkOutput::handleMessage(const Message& message)
}
}
void FileSinkOutput::applySettings(const FileSinkSettings& settings, bool force)
void SDRdaemonSinkOutput::applySettings(const FileSinkSettings& settings, bool force)
{
QMutexLocker mutexLocker(&m_mutex);
bool forwardChange = false;

View File

@ -1,5 +1,5 @@
///////////////////////////////////////////////////////////////////////////////////
// Copyright (C) 2016 Edouard Griffiths, F4EXB //
// Copyright (C) 2017 Edouard Griffiths, F4EXB //
// //
// This program is free software; you can redistribute it and/or modify //
// it under the terms of the GNU General Public License as published by //
@ -14,8 +14,8 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>. //
///////////////////////////////////////////////////////////////////////////////////
#ifndef INCLUDE_FILESINKOUTPUT_H
#define INCLUDE_FILESINKOUTPUT_H
#ifndef INCLUDE_SDRDAEMONSINKOUTPUT_H
#define INCLUDE_SDRDAEMONSINKOUTPUT_H
#include <QString>
#include <QTimer>
@ -27,130 +27,130 @@
#include "sdrdaemonsinksettings.h"
class FileSinkThread;
class SDRdaemonSinkThread;
class DeviceSinkAPI;
class FileSinkOutput : public DeviceSampleSink {
class SDRdaemonSinkOutput : public DeviceSampleSink {
public:
class MsgConfigureFileSink : public Message {
class MsgConfigureSDRdaemonSink : public Message {
MESSAGE_CLASS_DECLARATION
public:
const FileSinkSettings& getSettings() const { return m_settings; }
const SDRdaemonSinkSettings& getSettings() const { return m_settings; }
static MsgConfigureFileSink* create(const FileSinkSettings& settings)
static MsgConfigureSDRdaemonSink* create(const SDRdaemonSinkSettings& settings)
{
return new MsgConfigureFileSink(settings);
return new MsgConfigureSDRdaemonSink(settings);
}
private:
FileSinkSettings m_settings;
SDRdaemonSinkSettings m_settings;
MsgConfigureFileSink(const FileSinkSettings& settings) :
MsgConfigureSDRdaemonSink(const SDRdaemonSinkSettings& settings) :
Message(),
m_settings(settings)
{ }
};
class MsgConfigureFileSinkName : public Message {
class MsgConfigureSDRdaemonSinkName : public Message {
MESSAGE_CLASS_DECLARATION
public:
const QString& getFileName() const { return m_fileName; }
static MsgConfigureFileSinkName* create(const QString& fileName)
static MsgConfigureSDRdaemonSinkName* create(const QString& fileName)
{
return new MsgConfigureFileSinkName(fileName);
return new MsgConfigureSDRdaemonSinkName(fileName);
}
private:
QString m_fileName;
MsgConfigureFileSinkName(const QString& fileName) :
MsgConfigureSDRdaemonSinkName(const QString& fileName) :
Message(),
m_fileName(fileName)
{ }
};
class MsgConfigureFileSinkWork : public Message {
class MsgConfigureSDRdaemonSinkWork : public Message {
MESSAGE_CLASS_DECLARATION
public:
bool isWorking() const { return m_working; }
static MsgConfigureFileSinkWork* create(bool working)
static MsgConfigureSDRdaemonSinkWork* create(bool working)
{
return new MsgConfigureFileSinkWork(working);
return new MsgConfigureSDRdaemonSinkWork(working);
}
private:
bool m_working;
MsgConfigureFileSinkWork(bool working) :
MsgConfigureSDRdaemonSinkWork(bool working) :
Message(),
m_working(working)
{ }
};
class MsgConfigureFileSinkStreamTiming : public Message {
class MsgConfigureSDRdaemonSinkStreamTiming : public Message {
MESSAGE_CLASS_DECLARATION
public:
static MsgConfigureFileSinkStreamTiming* create()
static MsgConfigureSDRdaemonSinkStreamTiming* create()
{
return new MsgConfigureFileSinkStreamTiming();
return new MsgConfigureSDRdaemonSinkStreamTiming();
}
private:
MsgConfigureFileSinkStreamTiming() :
MsgConfigureSDRdaemonSinkStreamTiming() :
Message()
{ }
};
class MsgReportFileSinkGeneration : public Message {
class MsgReportSDRdaemonSinkGeneration : public Message {
MESSAGE_CLASS_DECLARATION
public:
bool getAcquisition() const { return m_acquisition; }
static MsgReportFileSinkGeneration* create(bool acquisition)
static MsgReportSDRdaemonSinkGeneration* create(bool acquisition)
{
return new MsgReportFileSinkGeneration(acquisition);
return new MsgReportSDRdaemonSinkGeneration(acquisition);
}
protected:
bool m_acquisition;
MsgReportFileSinkGeneration(bool acquisition) :
MsgReportSDRdaemonSinkGeneration(bool acquisition) :
Message(),
m_acquisition(acquisition)
{ }
};
class MsgReportFileSinkStreamTiming : public Message {
class MsgReportSDRdaemonSinkStreamTiming : public Message {
MESSAGE_CLASS_DECLARATION
public:
std::size_t getSamplesCount() const { return m_samplesCount; }
static MsgReportFileSinkStreamTiming* create(std::size_t samplesCount)
static MsgReportSDRdaemonSinkStreamTiming* create(std::size_t samplesCount)
{
return new MsgReportFileSinkStreamTiming(samplesCount);
return new MsgReportSDRdaemonSinkStreamTiming(samplesCount);
}
protected:
std::size_t m_samplesCount;
MsgReportFileSinkStreamTiming(std::size_t samplesCount) :
MsgReportSDRdaemonSinkStreamTiming(std::size_t samplesCount) :
Message(),
m_samplesCount(samplesCount)
{ }
};
FileSinkOutput(DeviceSinkAPI *deviceAPI, const QTimer& masterTimer);
virtual ~FileSinkOutput();
SDRdaemonSinkOutput(DeviceSinkAPI *deviceAPI, const QTimer& masterTimer);
virtual ~SDRdaemonSinkOutput();
virtual bool start();
virtual void stop();
@ -165,16 +165,16 @@ public:
private:
DeviceSinkAPI *m_deviceAPI;
QMutex m_mutex;
FileSinkSettings m_settings;
SDRdaemonSinkSettings m_settings;
std::ofstream m_ofstream;
FileSinkThread* m_fileSinkThread;
SDRdaemonSinkThread* m_fileSinkThread;
QString m_deviceDescription;
QString m_fileName;
std::time_t m_startingTimeStamp;
const QTimer& m_masterTimer;
void openFileStream();
void applySettings(const FileSinkSettings& settings, bool force = false);
void applySettings(const SDRdaemonSinkSettings& settings, bool force = false);
};
#endif // INCLUDE_FILESINKOUTPUT_H
#endif // INCLUDE_SDRDAEMONSINKOUTPUT_H

View File

@ -121,7 +121,7 @@ void SDRdaemonSinkThread::setLog2Interpolation(int log2Interpolation)
if (log2Interpolation != m_log2Interpolation)
{
qDebug() << "FileSinkThread::setLog2Interpolation:"
qDebug() << "SDRdaemonSinkThread::setLog2Interpolation:"
<< " new:" << log2Interpolation
<< " old:" << m_log2Interpolation;
@ -187,44 +187,47 @@ void SDRdaemonSinkThread::tick()
SampleVector::iterator readUntil;
m_sampleFifo->readAdvance(readUntil, m_samplesChunkSize);
m_sampleFifo->readAdvance(readUntil, m_samplesChunkSize); // pull samples
SampleVector::iterator beginRead = readUntil - m_samplesChunkSize;
m_samplesCount += m_samplesChunkSize;
if (m_log2Interpolation == 0)
{
m_ofstream->write(reinterpret_cast<char*>(&(*beginRead)), m_samplesChunkSize*sizeof(Sample));
}
else
{
int chunkSize = std::min((int) m_samplesChunkSize, m_samplerate);
m_ofstream->write(reinterpret_cast<char*>(&(*beginRead)), m_samplesChunkSize*sizeof(Sample)); // send samples
switch (m_log2Interpolation)
{
case 1:
m_interpolators.interpolate2_cen(&beginRead, m_buf, chunkSize*(1<<m_log2Interpolation)*2);
break;
case 2:
m_interpolators.interpolate4_cen(&beginRead, m_buf, chunkSize*(1<<m_log2Interpolation)*2);
break;
case 3:
m_interpolators.interpolate8_cen(&beginRead, m_buf, chunkSize*(1<<m_log2Interpolation)*2);
break;
case 4:
m_interpolators.interpolate16_cen(&beginRead, m_buf, chunkSize*(1<<m_log2Interpolation)*2);
break;
case 5:
m_interpolators.interpolate32_cen(&beginRead, m_buf, chunkSize*(1<<m_log2Interpolation)*2);
break;
case 6:
m_interpolators.interpolate64_cen(&beginRead, m_buf, chunkSize*(1<<m_log2Interpolation)*2);
break;
default:
break;
}
m_ofstream->write(reinterpret_cast<char*>(m_buf), m_samplesChunkSize*(1<<m_log2Interpolation)*2*sizeof(int16_t));
}
// interpolation is done on the far side
// if (m_log2Interpolation == 0)
// {
// m_ofstream->write(reinterpret_cast<char*>(&(*beginRead)), m_samplesChunkSize*sizeof(Sample)); // send samples
// }
// else
// {
// int chunkSize = std::min((int) m_samplesChunkSize, m_samplerate);
//
// switch (m_log2Interpolation)
// {
// case 1:
// m_interpolators.interpolate2_cen(&beginRead, m_buf, chunkSize*(1<<m_log2Interpolation)*2);
// break;
// case 2:
// m_interpolators.interpolate4_cen(&beginRead, m_buf, chunkSize*(1<<m_log2Interpolation)*2);
// break;
// case 3:
// m_interpolators.interpolate8_cen(&beginRead, m_buf, chunkSize*(1<<m_log2Interpolation)*2);
// break;
// case 4:
// m_interpolators.interpolate16_cen(&beginRead, m_buf, chunkSize*(1<<m_log2Interpolation)*2);
// break;
// case 5:
// m_interpolators.interpolate32_cen(&beginRead, m_buf, chunkSize*(1<<m_log2Interpolation)*2);
// break;
// case 6:
// m_interpolators.interpolate64_cen(&beginRead, m_buf, chunkSize*(1<<m_log2Interpolation)*2);
// break;
// default:
// break;
// }
//
// m_ofstream->write(reinterpret_cast<char*>(m_buf), m_samplesChunkSize*(1<<m_log2Interpolation)*2*sizeof(int16_t)); // send samples
// }
}
}

View File

@ -0,0 +1,298 @@
///////////////////////////////////////////////////////////////////////////////////
// Copyright (C) 2017 Edouard Griffiths, F4EXB //
// //
// This program is free software; you can redistribute it and/or modify //
// it under the terms of the GNU General Public License as published by //
// the Free Software Foundation as version 3 of the License, or //
// //
// This program is distributed in the hope that it will be useful, //
// but WITHOUT ANY WARRANTY; without even the implied warranty of //
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the //
// GNU General Public License V3 for more details. //
// //
// You should have received a copy of the GNU General Public License //
// along with this program. If not, see <http://www.gnu.org/licenses/>. //
///////////////////////////////////////////////////////////////////////////////////
#include <QDebug>
#include <sys/time.h>
#include <unistd.h>
#include <boost/crc.hpp>
#include <boost/cstdint.hpp>
#include "udpsinkfec.h"
MESSAGE_CLASS_DEFINITION(UDPSinkFECWorker::MsgUDPFECEncodeAndSend, Message)
MESSAGE_CLASS_DEFINITION(UDPSinkFECWorker::MsgConfigureRemoteAddress, Message)
UDPSinkFEC::UDPSinkFEC() :
m_centerFrequency(100000),
m_sampleRate(48000),
m_sampleBytes(1),
m_sampleBits(8),
m_nbSamples(0),
m_nbBlocksFEC(0),
m_txDelay(0),
m_txBlockIndex(0),
m_txBlocksIndex(0),
m_frameCount(0),
m_sampleIndex(0)
{
m_currentMetaFEC.init();
m_bufMeta = new uint8_t[m_udpSize];
m_buf = new uint8_t[m_udpSize];
m_udpWorker = new UDPSinkFECWorker();
m_udpWorker->moveToThread(&m_udpThread);
connect(&(m_udpWorker->m_inputMessageQueue), SIGNAL(messageEnqueued()), m_udpWorker, SLOT(handleInputMessages()));
m_udpThread.start();
}
UDPSinkFEC::~UDPSinkFEC()
{
disconnect(&(m_udpWorker->m_inputMessageQueue), SIGNAL(messageEnqueued()), m_udpWorker, SLOT(handleInputMessages()));
m_udpThread.exit();
m_udpThread.wait();
delete[] m_buf;
delete[] m_bufMeta;
delete m_udpWorker;
}
void UDPSinkFEC::setTxDelay(uint32_t txDelay)
{
qDebug() << "UDPSinkFEC::setTxDelay: txDelay: " << txDelay;
m_txDelay = txDelay;
}
void UDPSinkFEC::setNbBlocksFEC(uint32_t nbBlocksFEC)
{
qDebug() << "UDPSinkFEC::setNbBlocksFEC: nbBlocksFEC: " << nbBlocksFEC;
m_nbBlocksFEC = nbBlocksFEC;
}
void UDPSinkFEC::setRemoteAddress(const QString& address, uint16_t port)
{
qDebug() << "UDPSinkFEC::setRemoteAddress: address: " << address << " port: " << port;
m_udpWorker->setRemoteAddress(address, port);
}
void UDPSinkFEC::write(const SampleVector::iterator& begin, uint32_t sampleChunkSize)
{
const SampleVector::iterator end = begin + sampleChunkSize;
SampleVector::iterator it = begin;
while (it != end)
{
int inSamplesIndex = it - begin;
int inRemainingSamples = end - it;
if (m_txBlockIndex == 0) // Tx block index 0 is a block with only meta data
{
struct timeval tv;
MetaDataFEC metaData;
gettimeofday(&tv, 0);
// create meta data TODO: semaphore
metaData.m_centerFrequency = m_centerFrequency;
metaData.m_sampleRate = m_sampleRate;
metaData.m_sampleBytes = m_sampleBytes;
metaData.m_sampleBits = m_sampleBits;
metaData.m_nbOriginalBlocks = m_nbOriginalBlocks;
metaData.m_nbFECBlocks = m_nbBlocksFEC;
metaData.m_tv_sec = tv.tv_sec;
metaData.m_tv_usec = tv.tv_usec;
boost::crc_32_type crc32;
crc32.process_bytes(&metaData, 20);
metaData.m_crc32 = crc32.checksum();
memset((void *) &m_superBlock, 0, m_udpSize);
m_superBlock.header.frameIndex = m_frameCount;
m_superBlock.header.blockIndex = m_txBlockIndex;
memcpy((void *) &m_superBlock.protectedBlock, (const void *) &metaData, sizeof(MetaDataFEC));
if (!(metaData == m_currentMetaFEC))
{
qDebug() << "UDPSinkFEC::write: meta: "
<< "|" << metaData.m_centerFrequency
<< ":" << metaData.m_sampleRate
<< ":" << (int) (metaData.m_sampleBytes & 0xF)
<< ":" << (int) metaData.m_sampleBits
<< "|" << (int) metaData.m_nbOriginalBlocks
<< ":" << (int) metaData.m_nbFECBlocks
<< "|" << metaData.m_tv_sec
<< ":" << metaData.m_tv_usec
<< "|";
m_currentMetaFEC = metaData;
}
m_txBlocks[m_txBlocksIndex][0] = m_superBlock;
m_txBlockIndex = 1; // next Tx block with data
}
if (m_sampleIndex + inRemainingSamples < samplesPerBlock) // there is still room in the current super block
{
memcpy((void *) &m_superBlock.protectedBlock.m_samples[m_sampleIndex],
(const void *) &(*it),
inRemainingSamples * sizeof(Sample));
m_sampleIndex += inRemainingSamples;
it = end; // all input samples are consumed
}
else // complete super block and initiate the next if not end of frame
{
memcpy((void *) &m_superBlock.protectedBlock.m_samples[m_sampleIndex],
(const void *) &(*it),
(samplesPerBlock - m_sampleIndex) * sizeof(Sample));
it += samplesPerBlock - m_sampleIndex;
m_sampleIndex = 0;
m_superBlock.header.frameIndex = m_frameCount;
m_superBlock.header.blockIndex = m_txBlockIndex;
m_txBlocks[m_txBlocksIndex][m_txBlockIndex] = m_superBlock;
if (m_txBlockIndex == m_nbOriginalBlocks - 1) // frame complete
{
int nbBlocksFEC = m_nbBlocksFEC;
int txDelay = m_txDelay;
// TODO: send blocks
m_udpWorker->pushTxFrame(m_txBlocks[m_txBlocksIndex], nbBlocksFEC, txDelay, m_frameCount);
//m_txThread = new std::thread(transmitUDP, this, m_txBlocks[m_txBlocksIndex], m_frameCount, nbBlocksFEC, txDelay, m_cm256Valid);
//transmitUDP(this, m_txBlocks[m_txBlocksIndex], m_frameCount, m_nbBlocksFEC, m_txDelay, m_cm256Valid);
m_txBlocksIndex = (m_txBlocksIndex + 1) % 4;
m_txBlockIndex = 0;
m_frameCount++;
}
else
{
m_txBlockIndex++;
}
}
}
}
UDPSinkFECWorker::UDPSinkFECWorker() : m_remotePort(9090)
{
m_cm256Valid = m_cm256.isInitialized();
}
UDPSinkFECWorker::~UDPSinkFECWorker()
{
}
void UDPSinkFECWorker::pushTxFrame(const UDPSinkFEC::SuperBlock *txBlocks,
uint32_t nbBlocksFEC,
uint32_t txDelay,
uint16_t frameIndex)
{
m_inputMessageQueue.push(MsgUDPFECEncodeAndSend::create(txBlocks, nbBlocksFEC, txDelay, frameIndex));
}
void UDPSinkFECWorker::setRemoteAddress(const QString& address, uint16_t port)
{
m_inputMessageQueue.push(MsgConfigureRemoteAddress::create(address, port));
}
void UDPSinkFECWorker::handleInputMessages()
{
Message* message;
while ((message = m_inputMessageQueue.pop()) != 0)
{
if (MsgUDPFECEncodeAndSend::match(*message))
{
MsgUDPFECEncodeAndSend *sendMsg = (MsgUDPFECEncodeAndSend *) message;
}
else if (MsgConfigureRemoteAddress::match(*message))
{
MsgConfigureRemoteAddress *addressMsg = (MsgConfigureRemoteAddress *) message;
m_remoteAddress.setAddress(addressMsg->getAddress());
m_remotePort = addressMsg->getPort();
}
delete message;
}
}
void UDPSinkFECWorker::transmitUDP(UDPSinkFEC::SuperBlock *txBlockx, uint16_t frameIndex, int nbBlocksFEC, int txDelay)
{
CM256::cm256_encoder_params cm256Params; //!< Main interface with CM256 encoder
CM256::cm256_block descriptorBlocks[256]; //!< Pointers to data for CM256 encoder
UDPSinkFEC::ProtectedBlock fecBlocks[256]; //!< FEC data
if ((nbBlocksFEC == 0) || !m_cm256Valid)
{
for (int i = 0; i < UDPSinkFEC::m_nbOriginalBlocks; i++)
{
m_udpSocket.writeDatagram((const char *) &txBlockx[i], (int) UDPSinkFEC::m_udpSize, m_remoteAddress, m_remotePort);
usleep(txDelay);
}
}
else
{
cm256Params.BlockBytes = sizeof(UDPSinkFEC::ProtectedBlock);
cm256Params.OriginalCount = UDPSinkFEC::m_nbOriginalBlocks;
cm256Params.RecoveryCount = nbBlocksFEC;
// Fill pointers to data
for (int i = 0; i < cm256Params.OriginalCount + cm256Params.RecoveryCount; ++i)
{
if (i >= cm256Params.OriginalCount) {
memset((void *) &txBlockx[i].protectedBlock, 0, sizeof(UDPSinkFEC::ProtectedBlock));
}
txBlockx[i].header.frameIndex = frameIndex;
txBlockx[i].header.blockIndex = i;
descriptorBlocks[i].Block = (void *) &(txBlockx[i].protectedBlock);
descriptorBlocks[i].Index = txBlockx[i].header.blockIndex;
}
// Encode FEC blocks
if (m_cm256.cm256_encode(cm256Params, descriptorBlocks, fecBlocks))
{
qDebug() << "UDPSinkFECWorker::transmitUDP: CM256 encode failed. No transmission.";
return;
}
// Merge FEC with data to transmit
for (int i = 0; i < cm256Params.RecoveryCount; i++)
{
txBlockx[i + cm256Params.OriginalCount].protectedBlock = fecBlocks[i];
}
// Transmit all blocks
for (int i = 0; i < cm256Params.OriginalCount + cm256Params.RecoveryCount; i++)
{
#ifdef SDRDAEMON_PUNCTURE
if (i == SDRDAEMON_PUNCTURE) {
continue;
}
#endif
// std::cerr << "UDPSinkFEC::transmitUDP:"
// << " i: " << i
// << " frameIndex: " << (int) m_txBlocks[i].header.frameIndex
// << " blockIndex: " << (int) m_txBlocks[i].header.blockIndex
// << " i.q:";
//
// for (int j = 0; j < 10; j++)
// {
// std::cerr << " " << (int) m_txBlocks[i].protectedBlock.m_samples[j].m_real
// << "." << (int) m_txBlocks[i].protectedBlock.m_samples[j].m_imag;
// }
//
// std::cerr << std::endl;
m_udpSocket.writeDatagram((const char *) &txBlockx[i], (int) UDPSinkFEC::m_udpSize, m_remoteAddress, m_remotePort);
usleep(txDelay);
}
}
}

View File

@ -0,0 +1,248 @@
///////////////////////////////////////////////////////////////////////////////////
// Copyright (C) 2017 Edouard Griffiths, F4EXB //
// //
// This program is free software; you can redistribute it and/or modify //
// it under the terms of the GNU General Public License as published by //
// the Free Software Foundation as version 3 of the License, or //
// //
// This program is distributed in the hope that it will be useful, //
// but WITHOUT ANY WARRANTY; without even the implied warranty of //
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the //
// GNU General Public License V3 for more details. //
// //
// You should have received a copy of the GNU General Public License //
// along with this program. If not, see <http://www.gnu.org/licenses/>. //
///////////////////////////////////////////////////////////////////////////////////
#ifndef PLUGINS_SAMPLESINK_SDRDAEMONSINK_UDPSINKFEC_H_
#define PLUGINS_SAMPLESINK_SDRDAEMONSINK_UDPSINKFEC_H_
#include <string.h>
#include <cstddef>
#include <QObject>
#include <QUdpSocket>
#include <QHostAddress>
#include <QString>
#include <QThread>
#include "cm256.h"
#include "dsp/dsptypes.h"
#include "util/CRC64.h"
#include "util/messagequeue.h"
#include "util/message.h"
class UDPSinkFECWorker;
class UDPSinkFEC : public QObject
{
Q_OBJECT
public:
static const uint32_t m_udpSize = 512; //!< Size of UDP block in number of bytes
static const uint32_t m_nbOriginalBlocks = 128; //!< Number of original blocks in a protected block sequence
#pragma pack(push, 1)
struct MetaDataFEC
{
uint32_t m_centerFrequency; //!< 4 center frequency in kHz
uint32_t m_sampleRate; //!< 8 sample rate in Hz
uint8_t m_sampleBytes; //!< 9 MSB(4): indicators, LSB(4) number of bytes per sample
uint8_t m_sampleBits; //!< 10 number of effective bits per sample
uint8_t m_nbOriginalBlocks; //!< 11 number of blocks with original (protected) data
uint8_t m_nbFECBlocks; //!< 12 number of blocks carrying FEC
uint32_t m_tv_sec; //!< 16 seconds of timestamp at start time of super-frame processing
uint32_t m_tv_usec; //!< 20 microseconds of timestamp at start time of super-frame processing
uint32_t m_crc32; //!< 24 CRC32 of the above
bool operator==(const MetaDataFEC& rhs)
{
return (memcmp((const void *) this, (const void *) &rhs, 12) == 0); // Only the 12 first bytes are relevant
}
void init()
{
memset((void *) this, 0, sizeof(MetaDataFEC));
m_nbFECBlocks = -1;
}
};
struct Header
{
uint16_t frameIndex;
uint8_t blockIndex;
uint8_t filler;
};
static const int samplesPerBlock = (m_udpSize - sizeof(Header)) / sizeof(Sample);
struct ProtectedBlock
{
Sample m_samples[samplesPerBlock];
};
struct SuperBlock
{
Header header;
ProtectedBlock protectedBlock;
};
#pragma pack(pop)
/**
* Construct UDP sink
*/
UDPSinkFEC();
/** Destroy UDP sink */
~UDPSinkFEC();
/**
* Write IQ samples
*/
void write(const SampleVector::iterator& begin, uint32_t sampleChunkSize);
/** Return the last error, or return an empty string if there is no error. */
std::string error()
{
std::string ret(m_error);
m_error.clear();
return ret;
}
/** Set center frequency given in Hz */
void setCenterFrequency(uint64_t centerFrequency) { m_centerFrequency = centerFrequency / 1000; }
/** Set sample rate given in Hz */
void setSampleRate(uint32_t sampleRate) { m_sampleRate = sampleRate; }
void setSampleBytes(uint8_t sampleBytes) { m_sampleBytes = (sampleBytes & 0x0F) + (m_sampleBytes & 0xF0); }
void setSampleBits(uint8_t sampleBits) { m_sampleBits = sampleBits; }
void setNbBlocksFEC(uint32_t nbBlocksFEC);
void setTxDelay(uint32_t txDelay);
void setRemoteAddress(const QString& address, uint16_t port);
/** Return true if the stream is OK, return false if there is an error. */
operator bool() const
{
return m_error.empty();
}
private:
std::string m_error;
uint32_t m_centerFrequency; //!< center frequency in kHz
uint32_t m_sampleRate; //!< sample rate in Hz
uint8_t m_sampleBytes; //!< number of bytes per sample
uint8_t m_sampleBits; //!< number of effective bits per sample
uint32_t m_nbSamples; //!< total number of samples sent int the last frame
QHostAddress m_ownAddress;
CRC64 m_crc64;
uint8_t* m_bufMeta;
uint8_t* m_buf;
MetaDataFEC m_currentMetaFEC; //!< Meta data for current frame
uint32_t m_nbBlocksFEC; //!< Variable number of FEC blocks
uint32_t m_txDelay; //!< Delay in microseconds (usleep) between each sending of an UDP datagram
SuperBlock m_txBlocks[4][256]; //!< UDP blocks to send with original data + FEC
SuperBlock m_superBlock; //!< current super block being built
int m_txBlockIndex; //!< Current index in blocks to transmit in the Tx row
int m_txBlocksIndex; //!< Current index of Tx blocks row
uint16_t m_frameCount; //!< transmission frame count
int m_sampleIndex; //!< Current sample index in protected block data
QThread m_udpThread;
UDPSinkFECWorker *m_udpWorker;
};
class UDPSinkFECWorker : public QObject
{
Q_OBJECT
public:
class MsgUDPFECEncodeAndSend : public Message
{
MESSAGE_CLASS_DECLARATION
public:
const UDPSinkFEC::SuperBlock *getTxBlocks() const { return m_txBlockx; }
uint32_t getNbBlocsFEC() const { return m_nbBlocksFEC; }
uint32_t getTxDelay() const { return m_txDelay; }
uint16_t getFrameIndex() const { return m_frameIndex; }
static MsgUDPFECEncodeAndSend* create(
const UDPSinkFEC::SuperBlock *txBlocks,
uint32_t nbBlocksFEC,
uint32_t txDelay,
uint16_t frameIndex)
{
return new MsgUDPFECEncodeAndSend(txBlocks, nbBlocksFEC, txDelay, frameIndex);
}
private:
const UDPSinkFEC::SuperBlock *m_txBlockx;
uint32_t m_nbBlocksFEC;
uint32_t m_txDelay;
uint16_t m_frameIndex;
MsgUDPFECEncodeAndSend(
const UDPSinkFEC::SuperBlock *txBlocks,
uint32_t nbBlocksFEC,
uint32_t txDelay,
uint16_t frameIndex) :
m_txBlockx(txBlocks),
m_nbBlocksFEC(nbBlocksFEC),
m_txDelay(txDelay),
m_frameIndex(frameIndex)
{}
};
class MsgConfigureRemoteAddress : public Message
{
MESSAGE_CLASS_DECLARATION
public:
const QString& getAddress() const { return m_address; }
uint16_t getPort() const { return m_port; }
static MsgConfigureRemoteAddress* create(const QString& address, uint16_t port)
{
return new MsgConfigureRemoteAddress(address, port);
}
private:
QString m_address;
uint16_t m_port;
MsgConfigureRemoteAddress(const QString& address, uint16_t port) :
m_address(address),
m_port(port)
{}
};
UDPSinkFECWorker();
~UDPSinkFECWorker();
void pushTxFrame(const UDPSinkFEC::SuperBlock *txBlocks,
uint32_t nbBlocksFEC,
uint32_t txDelay,
uint16_t frameIndex);
void setRemoteAddress(const QString& address, uint16_t port);
MessageQueue m_inputMessageQueue; //!< Queue for asynchronous inbound communication
public slots:
void handleInputMessages();
private:
void transmitUDP(UDPSinkFEC::SuperBlock *txBlockx, uint16_t frameIndex, int nbBlocksFEC, int txDelay);
QUdpSocket m_udpSocket;
CM256 m_cm256; //!< CM256 library object
bool m_cm256Valid; //!< true if CM256 library is initialized correctly
QHostAddress m_remoteAddress;
uint16_t m_remotePort;
};
#endif /* PLUGINS_SAMPLESINK_SDRDAEMONSINK_UDPSINKFEC_H_ */