SDRDaemonSink: make UDP worker a QThread

This commit is contained in:
f4exb 2018-09-17 03:33:18 +02:00
parent 0d115ac342
commit 1e02b85702
5 changed files with 119 additions and 51 deletions

View File

@ -48,6 +48,7 @@ SDRdaemonSinkThread::~SDRdaemonSinkThread()
void SDRdaemonSinkThread::startWork()
{
qDebug() << "SDRdaemonSinkThread::startWork: ";
m_udpSinkFEC.start();
m_maxThrottlems = 0;
m_startWaitMutex.lock();
m_elapsedTimer.start();
@ -62,6 +63,7 @@ void SDRdaemonSinkThread::stopWork()
qDebug() << "SDRdaemonSinkThread::stopWork";
m_running = false;
wait();
m_udpSinkFEC.stop();
}
void SDRdaemonSinkThread::setSamplerate(int samplerate)

View File

@ -34,33 +34,39 @@ UDPSinkFEC::UDPSinkFEC() :
m_txBlockIndex(0),
m_txBlocksIndex(0),
m_frameCount(0),
m_sampleIndex(0)
m_sampleIndex(0),
m_udpWorker(0),
m_remoteAddress("127.0.0.1"),
m_remotePort(9090)
{
memset((char *) m_txBlocks, 0, 4*256*sizeof(SDRDaemonSuperBlock));
memset((char *) &m_superBlock, 0, sizeof(SDRDaemonSuperBlock));
m_currentMetaFEC.init();
m_bufMeta = new uint8_t[m_udpSize];
m_buf = new uint8_t[m_udpSize];
m_udpThread = new QThread();
m_udpWorker = new UDPSinkFECWorker();
m_udpWorker->moveToThread(m_udpThread);
connect(m_udpThread, SIGNAL(started()), m_udpWorker, SLOT(process()));
connect(m_udpWorker, SIGNAL(finished()), m_udpThread, SLOT(quit()));
m_udpThread->start();
}
UDPSinkFEC::~UDPSinkFEC()
{
m_udpWorker->stop();
m_udpThread->wait();
delete[] m_buf;
delete[] m_bufMeta;
delete m_udpWorker;
delete m_udpThread;
}
void UDPSinkFEC::start()
{
m_udpWorker = new UDPSinkFECWorker();
m_udpWorker->setRemoteAddress(m_remoteAddress, m_remotePort);
m_udpWorker->startStop(true);
}
void UDPSinkFEC::stop()
{
if (m_udpWorker)
{
m_udpWorker->startStop(false);
m_udpWorker->deleteLater();
m_udpWorker = 0;
}
}
void UDPSinkFEC::setTxDelay(float txDelayRatio)
@ -93,7 +99,12 @@ void UDPSinkFEC::setSampleRate(uint32_t sampleRate)
void UDPSinkFEC::setRemoteAddress(const QString& address, uint16_t port)
{
qDebug() << "UDPSinkFEC::setRemoteAddress: address: " << address << " port: " << port;
m_udpWorker->setRemoteAddress(address, port);
m_remoteAddress = address;
m_remotePort = port;
if (m_udpWorker) {
m_udpWorker->setRemoteAddress(m_remoteAddress, m_remotePort);
}
}
void UDPSinkFEC::write(const SampleVector::iterator& begin, uint32_t sampleChunkSize)
@ -183,7 +194,9 @@ void UDPSinkFEC::write(const SampleVector::iterator& begin, uint32_t sampleChunk
int nbBlocksFEC = m_nbBlocksFEC;
int txDelay = m_txDelay;
m_udpWorker->pushTxFrame(m_txBlocks[m_txBlocksIndex], nbBlocksFEC, txDelay, m_frameCount);
if (m_udpWorker) {
m_udpWorker->pushTxFrame(m_txBlocks[m_txBlocksIndex], nbBlocksFEC, txDelay, m_frameCount);
}
m_txBlocksIndex = (m_txBlocksIndex + 1) % 4;
m_txBlockIndex = 0;

View File

@ -23,7 +23,6 @@
#include <QObject>
#include <QHostAddress>
#include <QString>
#include <QThread>
#include "dsp/dsptypes.h"
#include "util/CRC64.h"
@ -46,6 +45,9 @@ public:
/** Destroy UDP sink */
~UDPSinkFEC();
void start();
void stop();
/**
* Write IQ samples
*/
@ -95,8 +97,9 @@ private:
uint16_t m_frameCount; //!< transmission frame count
int m_sampleIndex; //!< Current sample index in protected block data
QThread *m_udpThread;
UDPSinkFECWorker *m_udpWorker;
QString m_remoteAddress;
uint16_t m_remotePort;
};
#endif /* PLUGINS_SAMPLESINK_SDRDAEMONSINK_UDPSINKFEC_H_ */

View File

@ -18,6 +18,7 @@
MESSAGE_CLASS_DEFINITION(UDPSinkFECWorker::MsgUDPFECEncodeAndSend, Message)
MESSAGE_CLASS_DEFINITION(UDPSinkFECWorker::MsgConfigureRemoteAddress, Message)
MESSAGE_CLASS_DEFINITION(UDPSinkFECWorker::MsgStartStop, Message)
UDPSinkFECWorker::UDPSinkFECWorker() :
m_running(false),
@ -29,8 +30,45 @@ UDPSinkFECWorker::UDPSinkFECWorker() :
UDPSinkFECWorker::~UDPSinkFECWorker()
{
disconnect(&m_inputMessageQueue, SIGNAL(messageEnqueued()), this, SLOT(handleInputMessages()));
m_inputMessageQueue.clear();
}
void UDPSinkFECWorker::startStop(bool start)
{
MsgStartStop *msg = MsgStartStop::create(start);
m_inputMessageQueue.push(msg);
}
void UDPSinkFECWorker::startWork()
{
qDebug("UDPSinkFECWorker::startWork");
m_startWaitMutex.lock();
start();
while(!m_running)
m_startWaiter.wait(&m_startWaitMutex, 100);
m_startWaitMutex.unlock();
}
void UDPSinkFECWorker::stopWork()
{
qDebug("UDPSinkFECWorker::stopWork");
m_running = false;
wait();
}
void UDPSinkFECWorker::run()
{
m_running = true;
m_startWaiter.wakeAll();
qDebug("UDPSinkFECWorker::process: started");
while (m_running)
{
sleep(1);
}
m_running = false;
qDebug("UDPSinkFECWorker::process: stopped");
}
void UDPSinkFECWorker::pushTxFrame(SDRDaemonSuperBlock *txBlocks,
@ -47,26 +85,6 @@ void UDPSinkFECWorker::setRemoteAddress(const QString& address, uint16_t port)
m_inputMessageQueue.push(MsgConfigureRemoteAddress::create(address, port));
}
void UDPSinkFECWorker::process()
{
m_running = true;
qDebug("UDPSinkFECWorker::process: started");
while (m_running)
{
usleep(250000);
}
qDebug("UDPSinkFECWorker::process: stopped");
emit finished();
}
void UDPSinkFECWorker::stop()
{
m_running = false;
}
void UDPSinkFECWorker::handleInputMessages()
{
Message* message;
@ -85,6 +103,17 @@ void UDPSinkFECWorker::handleInputMessages()
m_remoteAddress = addressMsg->getAddress();
m_remotePort = addressMsg->getPort();
}
else if (MsgStartStop::match(*message))
{
MsgStartStop* notif = (MsgStartStop*) message;
qDebug("DaemonSinkThread::handleInputMessages: MsgStartStop: %s", notif->getStartStop() ? "start" : "stop");
if (notif->getStartStop()) {
startWork();
} else {
stopWork();
}
}
delete message;
}

View File

@ -17,7 +17,9 @@
#ifndef PLUGINS_SAMPLESINK_SDRDAEMONSINK_UDPSINKFECWORKER_H_
#define PLUGINS_SAMPLESINK_SDRDAEMONSINK_UDPSINKFECWORKER_H_
#include <QObject>
#include <QThread>
#include <QMutex>
#include <QWaitCondition>
#include "cm256.h"
@ -27,7 +29,7 @@
#include "UDPSocket.h"
class UDPSinkFECWorker : public QObject
class UDPSinkFECWorker : public QThread
{
Q_OBJECT
public:
@ -89,31 +91,50 @@ public:
{}
};
class MsgStartStop : public Message {
MESSAGE_CLASS_DECLARATION
public:
bool getStartStop() const { return m_startStop; }
static MsgStartStop* create(bool startStop) {
return new MsgStartStop(startStop);
}
protected:
bool m_startStop;
MsgStartStop(bool startStop) :
Message(),
m_startStop(startStop)
{ }
};
UDPSinkFECWorker();
~UDPSinkFECWorker();
void startStop(bool start);
void pushTxFrame(SDRDaemonSuperBlock *txBlocks,
uint32_t nbBlocksFEC,
uint32_t txDelay,
uint16_t frameIndex);
void setRemoteAddress(const QString& address, uint16_t port);
void stop();
MessageQueue m_inputMessageQueue; //!< Queue for asynchronous inbound communication
signals:
void finished();
public slots:
void process();
private slots:
void handleInputMessages();
private:
void startWork();
void stopWork();
void run();
void encodeAndTransmit(SDRDaemonSuperBlock *txBlockx, uint16_t frameIndex, uint32_t nbBlocksFEC, uint32_t txDelay);
volatile bool m_running;
QMutex m_startWaitMutex;
QWaitCondition m_startWaiter;
bool m_running;
CM256 m_cm256; //!< CM256 library object
bool m_cm256Valid; //!< true if CM256 library is initialized correctly
UDPSocket m_socket;