diff --git a/plugins/samplesink/remoteoutput/CMakeLists.txt b/plugins/samplesink/remoteoutput/CMakeLists.txt index 0e4a08738..3bf7c60a9 100644 --- a/plugins/samplesink/remoteoutput/CMakeLists.txt +++ b/plugins/samplesink/remoteoutput/CMakeLists.txt @@ -14,7 +14,7 @@ set(remoteoutput_SOURCES remoteoutputplugin.cpp remoteoutputsettings.cpp remoteoutputwebapiadapter.cpp - remoteoutputthread.cpp + remoteoutputworker.cpp udpsinkfec.cpp remoteoutputsender.cpp remoteoutputfifo.cpp @@ -25,7 +25,7 @@ set(remoteoutput_HEADERS remoteoutputplugin.h remoteoutputsettings.h remoteoutputwebapiadapter.h - remoteoutputthread.h + remoteoutputworker.h udpsinkfec.h remoteoutputsender.h remoteoutputfifo.h diff --git a/plugins/samplesink/remoteoutput/remoteoutput.cpp b/plugins/samplesink/remoteoutput/remoteoutput.cpp index a2ac0d115..f3c1293bb 100644 --- a/plugins/samplesink/remoteoutput/remoteoutput.cpp +++ b/plugins/samplesink/remoteoutput/remoteoutput.cpp @@ -36,7 +36,7 @@ #include "device/deviceapi.h" #include "remoteoutput.h" -#include "remoteoutputthread.h" +#include "remoteoutputworker.h" MESSAGE_CLASS_DEFINITION(RemoteOutput::MsgConfigureRemoteOutput, Message) MESSAGE_CLASS_DEFINITION(RemoteOutput::MsgConfigureRemoteOutputWork, Message) @@ -49,7 +49,7 @@ RemoteOutput::RemoteOutput(DeviceAPI *deviceAPI) : m_deviceAPI(deviceAPI), m_settings(), m_centerFrequency(0), - m_remoteOutputThread(0), + m_remoteOutputWorker(nullptr), m_deviceDescription("RemoteOutput"), m_startingTimeStamp(0), m_masterTimer(deviceAPI->getMasterTimer()), @@ -87,12 +87,13 @@ bool RemoteOutput::start() QMutexLocker mutexLocker(&m_mutex); qDebug() << "RemoteOutput::start"; - m_remoteOutputThread = new RemoteOutputThread(&m_sampleSourceFifo); - m_remoteOutputThread->setDataAddress(m_settings.m_dataAddress, m_settings.m_dataPort); - m_remoteOutputThread->setSamplerate(m_settings.m_sampleRate); - m_remoteOutputThread->setNbBlocksFEC(m_settings.m_nbFECBlocks); - m_remoteOutputThread->connectTimer(m_masterTimer); - m_remoteOutputThread->startWork(); + m_remoteOutputWorker = new RemoteOutputWorker(&m_sampleSourceFifo); + m_remoteOutputWorker->moveToThread(&m_remoteOutputWorkerThread); + m_remoteOutputWorker->setDataAddress(m_settings.m_dataAddress, m_settings.m_dataPort); + m_remoteOutputWorker->setSamplerate(m_settings.m_sampleRate); + m_remoteOutputWorker->setNbBlocksFEC(m_settings.m_nbFECBlocks); + m_remoteOutputWorker->connectTimer(m_masterTimer); + startWorker(); // restart auto rate correction m_lastRemoteTimestampRateCorrection = 0; @@ -100,7 +101,7 @@ bool RemoteOutput::start() m_lastQueueLength = -2; // set first value out of bounds m_chunkSizeCorrection = 0; - m_remoteOutputThread->setTxDelay(m_settings.m_txDelay); + m_remoteOutputWorker->setTxDelay(m_settings.m_txDelay); mutexLocker.unlock(); //applySettings(m_generalSettings, m_settings, true); @@ -119,14 +120,27 @@ void RemoteOutput::stop() qDebug() << "RemoteOutput::stop"; QMutexLocker mutexLocker(&m_mutex); - if(m_remoteOutputThread != 0) + if (m_remoteOutputWorker) { - m_remoteOutputThread->stopWork(); - delete m_remoteOutputThread; - m_remoteOutputThread = 0; + stopWorker(); + delete m_remoteOutputWorker; + m_remoteOutputWorker = nullptr; } } +void RemoteOutput::startWorker() +{ + m_remoteOutputWorker->startWork(); + m_remoteOutputWorkerThread.start(); +} + +void RemoteOutput::stopWorker() +{ + m_remoteOutputWorker->stopWork(); + m_remoteOutputWorkerThread.quit(); + m_remoteOutputWorkerThread.wait(); +} + QByteArray RemoteOutput::serialize() const { return m_settings.serialize(); @@ -189,15 +203,15 @@ bool RemoteOutput::handleMessage(const Message& message) MsgConfigureRemoteOutputWork& conf = (MsgConfigureRemoteOutputWork&) message; bool working = conf.isWorking(); - if (m_remoteOutputThread != 0) + if (m_remoteOutputWorker != 0) { if (working) { - m_remoteOutputThread->startWork(); + m_remoteOutputWorker->startWork(); } else { - m_remoteOutputThread->stopWork(); + m_remoteOutputWorker->stopWork(); } } @@ -230,9 +244,9 @@ bool RemoteOutput::handleMessage(const Message& message) { MsgConfigureRemoteOutputChunkCorrection& conf = (MsgConfigureRemoteOutputChunkCorrection&) message; - if (m_remoteOutputThread != 0) + if (m_remoteOutputWorker != 0) { - m_remoteOutputThread->setChunkCorrection(conf.getChunkCorrection()); + m_remoteOutputWorker->setChunkCorrection(conf.getChunkCorrection()); } return true; @@ -265,8 +279,8 @@ void RemoteOutput::applySettings(const RemoteOutputSettings& settings, bool forc if (force || (m_settings.m_dataAddress != settings.m_dataAddress) || (m_settings.m_dataPort != settings.m_dataPort)) { - if (m_remoteOutputThread != 0) { - m_remoteOutputThread->setDataAddress(settings.m_dataAddress, settings.m_dataPort); + if (m_remoteOutputWorker != 0) { + m_remoteOutputWorker->setDataAddress(settings.m_dataAddress, settings.m_dataPort); } } @@ -274,8 +288,8 @@ void RemoteOutput::applySettings(const RemoteOutputSettings& settings, bool forc { reverseAPIKeys.append("sampleRate"); - if (m_remoteOutputThread != 0) { - m_remoteOutputThread->setSamplerate(settings.m_sampleRate); + if (m_remoteOutputWorker != 0) { + m_remoteOutputWorker->setSamplerate(settings.m_sampleRate); } m_tickMultiplier = (21*NbSamplesForRateCorrection) / (2*settings.m_sampleRate); // two times per sample filling period plus small extension @@ -289,8 +303,8 @@ void RemoteOutput::applySettings(const RemoteOutputSettings& settings, bool forc { reverseAPIKeys.append("nbFECBlocks"); - if (m_remoteOutputThread != 0) { - m_remoteOutputThread->setNbBlocksFEC(settings.m_nbFECBlocks); + if (m_remoteOutputWorker != 0) { + m_remoteOutputWorker->setNbBlocksFEC(settings.m_nbFECBlocks); } changeTxDelay = true; @@ -304,8 +318,8 @@ void RemoteOutput::applySettings(const RemoteOutputSettings& settings, bool forc if (changeTxDelay) { - if (m_remoteOutputThread != 0) { - m_remoteOutputThread->setTxDelay(settings.m_txDelay); + if (m_remoteOutputWorker != 0) { + m_remoteOutputWorker->setTxDelay(settings.m_txDelay); } } @@ -485,7 +499,7 @@ void RemoteOutput::webapiFormatDeviceReport(SWGSDRangel::SWGDeviceReport& respon { uint64_t ts_usecs; response.getRemoteOutputReport()->setBufferRwBalance(m_sampleSourceFifo.getRWBalance()); - response.getRemoteOutputReport()->setSampleCount(m_remoteOutputThread ? (int) m_remoteOutputThread->getSamplesCount(ts_usecs) : 0); + response.getRemoteOutputReport()->setSampleCount(m_remoteOutputWorker ? (int) m_remoteOutputWorker->getSamplesCount(ts_usecs) : 0); } void RemoteOutput::tick() @@ -551,7 +565,7 @@ void RemoteOutput::analyzeApiReply(const QJsonObject& jsonObject, const QString& m_settings.m_centerFrequency = report["deviceCenterFreq"].toInt(); m_centerFrequency = m_settings.m_centerFrequency * 1000; - if (!m_remoteOutputThread) { + if (!m_remoteOutputWorker) { return; } @@ -572,7 +586,7 @@ void RemoteOutput::analyzeApiReply(const QJsonObject& jsonObject, const QString& uint32_t sampleCountDelta, sampleCount; uint64_t timestampUs; - sampleCount = m_remoteOutputThread->getSamplesCount(timestampUs); + sampleCount = m_remoteOutputWorker->getSamplesCount(timestampUs); if (sampleCount < m_lastSampleCount) { sampleCountDelta = (0xFFFFFFFFU - m_lastSampleCount) + sampleCount + 1; diff --git a/plugins/samplesink/remoteoutput/remoteoutput.h b/plugins/samplesink/remoteoutput/remoteoutput.h index b46d0838a..d5ab10261 100644 --- a/plugins/samplesink/remoteoutput/remoteoutput.h +++ b/plugins/samplesink/remoteoutput/remoteoutput.h @@ -26,12 +26,13 @@ #include #include #include +#include #include "dsp/devicesamplesink.h" #include "remoteoutputsettings.h" -class RemoteOutputThread; +class RemoteOutputWorker; class DeviceAPI; class QNetworkAccessManager; class QNetworkReply; @@ -180,7 +181,8 @@ private: QMutex m_mutex; RemoteOutputSettings m_settings; uint64_t m_centerFrequency; - RemoteOutputThread* m_remoteOutputThread; + RemoteOutputWorker* m_remoteOutputWorker; + QThread m_remoteOutputWorkerThread; QString m_deviceDescription; std::time_t m_startingTimeStamp; const QTimer& m_masterTimer; @@ -200,6 +202,8 @@ private: int m_chunkSizeCorrection; static const uint32_t NbSamplesForRateCorrection; + void startWorker(); + void stopWorker(); void applySettings(const RemoteOutputSettings& settings, bool force = false); void webapiFormatDeviceReport(SWGSDRangel::SWGDeviceReport& response); diff --git a/plugins/samplesink/remoteoutput/remoteoutputthread.cpp b/plugins/samplesink/remoteoutput/remoteoutputworker.cpp similarity index 79% rename from plugins/samplesink/remoteoutput/remoteoutputthread.cpp rename to plugins/samplesink/remoteoutput/remoteoutputworker.cpp index 8e907b2ea..a1a88c31b 100644 --- a/plugins/samplesink/remoteoutput/remoteoutputthread.cpp +++ b/plugins/samplesink/remoteoutput/remoteoutputworker.cpp @@ -23,10 +23,10 @@ #include "dsp/samplesourcefifo.h" #include "util/timeutil.h" -#include "remoteoutputthread.h" +#include "remoteoutputworker.h" -RemoteOutputThread::RemoteOutputThread(SampleSourceFifo* sampleFifo, QObject* parent) : - QThread(parent), +RemoteOutputWorker::RemoteOutputWorker(SampleSourceFifo* sampleFifo, QObject* parent) : + QObject(parent), m_running(false), m_samplesChunkSize(0), m_sampleFifo(sampleFifo), @@ -39,39 +39,33 @@ RemoteOutputThread::RemoteOutputThread(SampleSourceFifo* sampleFifo, QObject* pa { } -RemoteOutputThread::~RemoteOutputThread() +RemoteOutputWorker::~RemoteOutputWorker() { if (m_running) { stopWork(); } } -void RemoteOutputThread::startWork() +void RemoteOutputWorker::startWork() { - qDebug() << "RemoteOutputThread::startWork: "; + qDebug() << "RemoteOutputWorker::startWork: "; m_udpSinkFEC.startSender(); m_maxThrottlems = 0; - m_startWaitMutex.lock(); - m_elapsedTimer.start(); - start(); - while(!m_running) - m_startWaiter.wait(&m_startWaitMutex, 100); - m_startWaitMutex.unlock(); + m_running = true; } -void RemoteOutputThread::stopWork() +void RemoteOutputWorker::stopWork() { - qDebug() << "RemoteOutputThread::stopWork"; + qDebug() << "RemoteOutputWorker::stopWork"; m_running = false; - wait(); m_udpSinkFEC.stopSender(); } -void RemoteOutputThread::setSamplerate(int samplerate) +void RemoteOutputWorker::setSamplerate(int samplerate) { if (samplerate != m_samplerate) { - qDebug() << "RemoteOutputThread::setSamplerate:" + qDebug() << "RemoteOutputWorker::setSamplerate:" << " new:" << samplerate << " old:" << m_samplerate; @@ -102,26 +96,13 @@ void RemoteOutputThread::setSamplerate(int samplerate) } } -void RemoteOutputThread::run() +void RemoteOutputWorker::connectTimer(const QTimer& timer) { - m_running = true; - m_startWaiter.wakeAll(); - - while(m_running) // actual work is in the tick() function - { - sleep(1); - } - - m_running = false; -} - -void RemoteOutputThread::connectTimer(const QTimer& timer) -{ - qDebug() << "RemoteOutputThread::connectTimer"; + qDebug() << "RemoteOutputWorker::connectTimer"; connect(&timer, SIGNAL(timeout()), this, SLOT(tick())); } -void RemoteOutputThread::tick() +void RemoteOutputWorker::tick() { if (m_running) { @@ -157,7 +138,7 @@ void RemoteOutputThread::tick() } } -uint32_t RemoteOutputThread::getSamplesCount(uint64_t& ts_usecs) const +uint32_t RemoteOutputWorker::getSamplesCount(uint64_t& ts_usecs) const { ts_usecs = TimeUtil::nowus(); return m_samplesCount; diff --git a/plugins/samplesink/remoteoutput/remoteoutputthread.h b/plugins/samplesink/remoteoutput/remoteoutputworker.h similarity index 88% rename from plugins/samplesink/remoteoutput/remoteoutputthread.h rename to plugins/samplesink/remoteoutput/remoteoutputworker.h index ce992a59e..584615e80 100644 --- a/plugins/samplesink/remoteoutput/remoteoutputthread.h +++ b/plugins/samplesink/remoteoutput/remoteoutputworker.h @@ -15,17 +15,15 @@ // along with this program. If not, see . // /////////////////////////////////////////////////////////////////////////////////// -#ifndef INCLUDE_REMOTEOUTPUTTHREAD_H -#define INCLUDE_REMOTEOUTPUTTHREAD_H +#ifndef INCLUDE_REMOTEOUTPUTWORKER_H +#define INCLUDE_REMOTEOUTPUTWORKER_H #include #include #include #include -#include -#include -#include +#include #include #include @@ -39,12 +37,12 @@ class SampleSourceFifo; struct timeval; -class RemoteOutputThread : public QThread { +class RemoteOutputWorker : public QObject { Q_OBJECT public: - RemoteOutputThread(SampleSourceFifo* sampleFifo, QObject* parent = 0); - ~RemoteOutputThread(); + RemoteOutputWorker(SampleSourceFifo* sampleFifo, QObject* parent = 0); + ~RemoteOutputWorker(); void startWork(); void stopWork(); @@ -63,8 +61,6 @@ public: void connectTimer(const QTimer& timer); private: - QMutex m_startWaitMutex; - QWaitCondition m_startWaiter; volatile bool m_running; int m_samplesChunkSize; @@ -80,8 +76,6 @@ private: UDPSinkFEC m_udpSinkFEC; - void run(); - private slots: void tick(); }; diff --git a/plugins/samplesink/remoteoutput/udpsinkfecworker.cpp b/plugins/samplesink/remoteoutput/udpsinkfecworker.cpp deleted file mode 100644 index 91072200b..000000000 --- a/plugins/samplesink/remoteoutput/udpsinkfecworker.cpp +++ /dev/null @@ -1,207 +0,0 @@ -/////////////////////////////////////////////////////////////////////////////////// -// Copyright (C) 2017 Edouard Griffiths, F4EXB // -// // -// This program is free software; you can redistribute it and/or modify // -// it under the terms of the GNU General Public License as published by // -// the Free Software Foundation as version 3 of the License, or // -// (at your option) any later version. // -// // -// This program is distributed in the hope that it will be useful, // -// but WITHOUT ANY WARRANTY; without even the implied warranty of // -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // -// GNU General Public License V3 for more details. // -// // -// You should have received a copy of the GNU General Public License // -// along with this program. If not, see . // -/////////////////////////////////////////////////////////////////////////////////// - -#include "udpsinkfecworker.h" - -#include - - -MESSAGE_CLASS_DEFINITION(UDPSinkFECWorker::MsgUDPFECEncodeAndSend, Message) -MESSAGE_CLASS_DEFINITION(UDPSinkFECWorker::MsgConfigureRemoteAddress, Message) -MESSAGE_CLASS_DEFINITION(UDPSinkFECWorker::MsgStartStop, Message) - -UDPSinkFECWorker::UDPSinkFECWorker() : - m_running(false), - m_udpSocket(0), - m_remotePort(9090) -{ - m_cm256Valid = m_cm256.isInitialized(); - connect(&m_inputMessageQueue, SIGNAL(messageEnqueued()), this, SLOT(handleInputMessages()), Qt::QueuedConnection); -} - -UDPSinkFECWorker::~UDPSinkFECWorker() -{ -} - -void UDPSinkFECWorker::startStop(bool start) -{ - MsgStartStop *msg = MsgStartStop::create(start); - m_inputMessageQueue.push(msg); -} - -void UDPSinkFECWorker::startWork() -{ - qDebug("UDPSinkFECWorker::startWork"); - m_startWaitMutex.lock(); - m_udpSocket = new QUdpSocket(this); - - start(); - - while(!m_running) { - m_startWaiter.wait(&m_startWaitMutex, 100); - } - - m_startWaitMutex.unlock(); -} - -void UDPSinkFECWorker::stopWork() -{ - qDebug("UDPSinkFECWorker::stopWork"); - delete m_udpSocket; - m_udpSocket = 0; - m_running = false; - wait(); -} - -void UDPSinkFECWorker::run() -{ - m_running = true; - m_startWaiter.wakeAll(); - - qDebug("UDPSinkFECWorker::process: started"); - - while (m_running) - { - sleep(1); - } - m_running = false; - - qDebug("UDPSinkFECWorker::process: stopped"); -} - -void UDPSinkFECWorker::pushTxFrame(RemoteSuperBlock *txBlocks, - uint32_t nbBlocksFEC, - uint32_t txDelay, - uint16_t frameIndex) -{ - //qDebug("UDPSinkFECWorker::pushTxFrame. %d", m_inputMessageQueue.size()); - m_inputMessageQueue.push(MsgUDPFECEncodeAndSend::create(txBlocks, nbBlocksFEC, txDelay, frameIndex)); -} - -void UDPSinkFECWorker::setRemoteAddress(const QString& address, uint16_t port) -{ - m_inputMessageQueue.push(MsgConfigureRemoteAddress::create(address, port)); -} - -void UDPSinkFECWorker::handleInputMessages() -{ - Message* message; - - while ((message = m_inputMessageQueue.pop()) != 0) - { - if (MsgUDPFECEncodeAndSend::match(*message)) - { - MsgUDPFECEncodeAndSend *sendMsg = (MsgUDPFECEncodeAndSend *) message; - encodeAndTransmit(sendMsg->getTxBlocks(), sendMsg->getFrameIndex(), sendMsg->getNbBlocsFEC(), sendMsg->getTxDelay()); - } - else if (MsgConfigureRemoteAddress::match(*message)) - { - qDebug("UDPSinkFECWorker::handleInputMessages: %s", message->getIdentifier()); - MsgConfigureRemoteAddress *addressMsg = (MsgConfigureRemoteAddress *) message; - m_remoteAddress = addressMsg->getAddress(); - m_remotePort = addressMsg->getPort(); - m_remoteHostAddress.setAddress(addressMsg->getAddress()); - } - else if (MsgStartStop::match(*message)) - { - MsgStartStop* notif = (MsgStartStop*) message; - qDebug("UDPSinkFECWorker::handleInputMessages: MsgStartStop: %s", notif->getStartStop() ? "start" : "stop"); - - if (notif->getStartStop()) { - startWork(); - } else { - stopWork(); - } - } - - delete message; - } -} - -void UDPSinkFECWorker::encodeAndTransmit(RemoteSuperBlock *txBlockx, uint16_t frameIndex, uint32_t nbBlocksFEC, uint32_t txDelay) -{ - CM256::cm256_encoder_params cm256Params; //!< Main interface with CM256 encoder - CM256::cm256_block descriptorBlocks[256]; //!< Pointers to data for CM256 encoder - RemoteProtectedBlock fecBlocks[256]; //!< FEC data - - if ((nbBlocksFEC == 0) || !m_cm256Valid) - { - if (m_udpSocket) - { - for (unsigned int i = 0; i < RemoteNbOrginalBlocks; i++) - { - m_udpSocket->writeDatagram((const char *) &txBlockx[i], RemoteUdpSize, m_remoteHostAddress, m_remotePort); - usleep(txDelay); - } - } - } - else - { - cm256Params.BlockBytes = sizeof(RemoteProtectedBlock); - cm256Params.OriginalCount = RemoteNbOrginalBlocks; - cm256Params.RecoveryCount = nbBlocksFEC; - - - // Fill pointers to data - for (int i = 0; i < cm256Params.OriginalCount + cm256Params.RecoveryCount; ++i) - { - if (i >= cm256Params.OriginalCount) { - memset((char *) &txBlockx[i].m_protectedBlock, 0, sizeof(RemoteProtectedBlock)); - } - - txBlockx[i].m_header.m_frameIndex = frameIndex; - txBlockx[i].m_header.m_blockIndex = i; - txBlockx[i].m_header.m_sampleBytes = (SDR_RX_SAMP_SZ <= 16 ? 2 : 4); - txBlockx[i].m_header.m_sampleBits = SDR_RX_SAMP_SZ; - descriptorBlocks[i].Block = (void *) &(txBlockx[i].m_protectedBlock); - descriptorBlocks[i].Index = txBlockx[i].m_header.m_blockIndex; - } - - // Encode FEC blocks - if (m_cm256.cm256_encode(cm256Params, descriptorBlocks, fecBlocks)) - { - qDebug("UDPSinkFECWorker::encodeAndTransmit: CM256 encode failed. No transmission."); - return; - } - - // Merge FEC with data to transmit - for (int i = 0; i < cm256Params.RecoveryCount; i++) - { - txBlockx[i + cm256Params.OriginalCount].m_protectedBlock = fecBlocks[i]; - } - - // Transmit all blocks - if (m_udpSocket) - { - for (int i = 0; i < cm256Params.OriginalCount + cm256Params.RecoveryCount; i++) - { - #ifdef REMOTE_PUNCTURE - if (i == REMOTE_PUNCTURE) { - continue; - } - #endif - - m_udpSocket->writeDatagram((const char *) &txBlockx[i], RemoteUdpSize, m_remoteHostAddress, m_remotePort); - usleep(txDelay); - } - } - } -} - - - - diff --git a/plugins/samplesink/remoteoutput/udpsinkfecworker.h b/plugins/samplesink/remoteoutput/udpsinkfecworker.h deleted file mode 100644 index c9e5a3bb3..000000000 --- a/plugins/samplesink/remoteoutput/udpsinkfecworker.h +++ /dev/null @@ -1,148 +0,0 @@ -/////////////////////////////////////////////////////////////////////////////////// -// Copyright (C) 2017 Edouard Griffiths, F4EXB // -// // -// This program is free software; you can redistribute it and/or modify // -// it under the terms of the GNU General Public License as published by // -// the Free Software Foundation as version 3 of the License, or // -// (at your option) any later version. // -// // -// This program is distributed in the hope that it will be useful, // -// but WITHOUT ANY WARRANTY; without even the implied warranty of // -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // -// GNU General Public License V3 for more details. // -// // -// You should have received a copy of the GNU General Public License // -// along with this program. If not, see . // -/////////////////////////////////////////////////////////////////////////////////// - -#ifndef PLUGINS_SAMPLESINK_REMOTEOUTPUT_UDPSINKFECWORKER_H_ -#define PLUGINS_SAMPLESINK_REMOTEOUTPUT_UDPSINKFECWORKER_H_ - -#include -#include -#include -#include -#include - -#include "cm256cc/cm256.h" - -#include "util/messagequeue.h" -#include "util/message.h" - -class QUdpSocket; - -class UDPSinkFECWorker : public QThread -{ - Q_OBJECT -public: - class MsgUDPFECEncodeAndSend : public Message - { - MESSAGE_CLASS_DECLARATION - public: - RemoteSuperBlock *getTxBlocks() const { return m_txBlockx; } - uint32_t getNbBlocsFEC() const { return m_nbBlocksFEC; } - uint32_t getTxDelay() const { return m_txDelay; } - uint16_t getFrameIndex() const { return m_frameIndex; } - - static MsgUDPFECEncodeAndSend* create( - RemoteSuperBlock *txBlocks, - uint32_t nbBlocksFEC, - uint32_t txDelay, - uint16_t frameIndex) - { - return new MsgUDPFECEncodeAndSend(txBlocks, nbBlocksFEC, txDelay, frameIndex); - } - - private: - RemoteSuperBlock *m_txBlockx; - uint32_t m_nbBlocksFEC; - uint32_t m_txDelay; - uint16_t m_frameIndex; - - MsgUDPFECEncodeAndSend( - RemoteSuperBlock *txBlocks, - uint32_t nbBlocksFEC, - uint32_t txDelay, - uint16_t frameIndex) : - m_txBlockx(txBlocks), - m_nbBlocksFEC(nbBlocksFEC), - m_txDelay(txDelay), - m_frameIndex(frameIndex) - {} - }; - - class MsgConfigureRemoteAddress : public Message - { - MESSAGE_CLASS_DECLARATION - public: - const QString& getAddress() const { return m_address; } - uint16_t getPort() const { return m_port; } - - static MsgConfigureRemoteAddress* create(const QString& address, uint16_t port) - { - return new MsgConfigureRemoteAddress(address, port); - } - - private: - QString m_address; - uint16_t m_port; - - MsgConfigureRemoteAddress(const QString& address, uint16_t port) : - m_address(address), - m_port(port) - {} - }; - - class MsgStartStop : public Message { - MESSAGE_CLASS_DECLARATION - - public: - bool getStartStop() const { return m_startStop; } - - static MsgStartStop* create(bool startStop) { - return new MsgStartStop(startStop); - } - - protected: - bool m_startStop; - - MsgStartStop(bool startStop) : - Message(), - m_startStop(startStop) - { } - }; - - UDPSinkFECWorker(); - ~UDPSinkFECWorker(); - - void startStop(bool start); - - void pushTxFrame(RemoteSuperBlock *txBlocks, - uint32_t nbBlocksFEC, - uint32_t txDelay, - uint16_t frameIndex); - void setRemoteAddress(const QString& address, uint16_t port); - - MessageQueue m_inputMessageQueue; //!< Queue for asynchronous inbound communication - -private slots: - void handleInputMessages(); - -private: - void startWork(); - void stopWork(); - void run(); - void encodeAndTransmit(RemoteSuperBlock *txBlockx, uint16_t frameIndex, uint32_t nbBlocksFEC, uint32_t txDelay); - - QMutex m_startWaitMutex; - QWaitCondition m_startWaiter; - volatile bool m_running; - CM256 m_cm256; //!< CM256 library object - bool m_cm256Valid; //!< true if CM256 library is initialized correctly - QUdpSocket *m_udpSocket; - QString m_remoteAddress; - uint16_t m_remotePort; - QHostAddress m_remoteHostAddress; -}; - -#endif /* PLUGINS_SAMPLESINK_REMOTEOUTPUT_UDPSINKFECWORKER_H_ */