From d482471a59b89e749f00609aad369d606a334857 Mon Sep 17 00:00:00 2001 From: f4exb Date: Thu, 23 Dec 2021 01:47:38 +0100 Subject: [PATCH] Remote Sink: reworked threading model. Fixed sigabort at exit time --- plugins/channelrx/remotesink/remotesink.cpp | 20 ++++--- plugins/channelrx/remotesink/remotesink.h | 3 +- .../remotesink/remotesinkbaseband.cpp | 40 +++++++++---- .../channelrx/remotesink/remotesinkbaseband.h | 6 +- .../channelrx/remotesink/remotesinksender.cpp | 60 ++++++++++++------- .../channelrx/remotesink/remotesinksender.h | 8 ++- .../channelrx/remotesink/remotesinksink.cpp | 46 ++++++++++---- plugins/channelrx/remotesink/remotesinksink.h | 11 ++-- 8 files changed, 136 insertions(+), 58 deletions(-) diff --git a/plugins/channelrx/remotesink/remotesink.cpp b/plugins/channelrx/remotesink/remotesink.cpp index 505f0715c..ef2259428 100644 --- a/plugins/channelrx/remotesink/remotesink.cpp +++ b/plugins/channelrx/remotesink/remotesink.cpp @@ -55,9 +55,8 @@ RemoteSink::RemoteSink(DeviceAPI *deviceAPI) : { setObjectName(m_channelId); - m_thread = new QThread(this); m_basebandSink = new RemoteSinkBaseband(); - m_basebandSink->moveToThread(m_thread); + m_basebandSink->moveToThread(&m_thread); applySettings(m_settings, true); @@ -74,8 +73,12 @@ RemoteSink::~RemoteSink() delete m_networkManager; m_deviceAPI->removeChannelSinkAPI(this); m_deviceAPI->removeChannelSink(this); + + if (m_basebandSink->isRunning()) { + stop(); + } + delete m_basebandSink; - delete m_thread; } uint32_t RemoteSink::getNumberOfDeviceStreams() const @@ -93,21 +96,20 @@ void RemoteSink::start() { qDebug("RemoteSink::start: m_basebandSampleRate: %d", m_basebandSampleRate); m_basebandSink->reset(); + m_basebandSink->startWork(); + m_thread.start(); if (m_basebandSampleRate != 0) { m_basebandSink->setBasebandSampleRate(m_basebandSampleRate); } - - m_thread->start(); - m_basebandSink->startSender(); } void RemoteSink::stop() { qDebug("RemoteSink::stop"); - m_basebandSink->stopSender(); - m_thread->exit(); - m_thread->wait(); + m_basebandSink->stopWork(); + m_thread.quit(); + m_thread.wait(); } bool RemoteSink::handleMessage(const Message& cmd) diff --git a/plugins/channelrx/remotesink/remotesink.h b/plugins/channelrx/remotesink/remotesink.h index c78cf49d6..26b3ea8c8 100644 --- a/plugins/channelrx/remotesink/remotesink.h +++ b/plugins/channelrx/remotesink/remotesink.h @@ -26,6 +26,7 @@ #include #include +#include #include "dsp/basebandsamplesink.h" #include "channel/channelapi.h" @@ -115,7 +116,7 @@ public: private: DeviceAPI *m_deviceAPI; - QThread *m_thread; + QThread m_thread; RemoteSinkBaseband *m_basebandSink; RemoteSinkSettings m_settings; diff --git a/plugins/channelrx/remotesink/remotesinkbaseband.cpp b/plugins/channelrx/remotesink/remotesinkbaseband.cpp index 05f1c2500..daab8eeab 100644 --- a/plugins/channelrx/remotesink/remotesinkbaseband.cpp +++ b/plugins/channelrx/remotesink/remotesinkbaseband.cpp @@ -28,18 +28,9 @@ MESSAGE_CLASS_DEFINITION(RemoteSinkBaseband::MsgConfigureRemoteSinkBaseband, Mes RemoteSinkBaseband::RemoteSinkBaseband() : m_mutex(QMutex::Recursive) { + qDebug("RemoteSinkBaseband::RemoteSinkBaseband"); m_sampleFifo.setSize(SampleSinkFifo::getSizePolicy(48000)); m_channelizer = new DownChannelizer(&m_sink); - - qDebug("RemoteSinkBaseband::RemoteSinkBaseband"); - QObject::connect( - &m_sampleFifo, - &SampleSinkFifo::dataReady, - this, - &RemoteSinkBaseband::handleData, - Qt::QueuedConnection - ); - connect(&m_inputMessageQueue, SIGNAL(messageEnqueued()), this, SLOT(handleInputMessages())); } @@ -55,6 +46,35 @@ void RemoteSinkBaseband::reset() m_sink.init(); } +void RemoteSinkBaseband::startWork() +{ + QMutexLocker mutexLocker(&m_mutex); + QObject::connect( + &m_sampleFifo, + &SampleSinkFifo::dataReady, + this, + &RemoteSinkBaseband::handleData, + Qt::QueuedConnection + ); + connect(&m_inputMessageQueue, SIGNAL(messageEnqueued()), this, SLOT(handleInputMessages())); + m_sink.start(); + m_running = true; +} + +void RemoteSinkBaseband::stopWork() +{ + QMutexLocker mutexLocker(&m_mutex); + m_sink.stop(); + disconnect(&m_inputMessageQueue, SIGNAL(messageEnqueued()), this, SLOT(handleInputMessages())); + QObject::disconnect( + &m_sampleFifo, + &SampleSinkFifo::dataReady, + this, + &RemoteSinkBaseband::handleData + ); + m_running = false; +} + void RemoteSinkBaseband::feed(const SampleVector::const_iterator& begin, const SampleVector::const_iterator& end) { m_sampleFifo.write(begin, end); diff --git a/plugins/channelrx/remotesink/remotesinkbaseband.h b/plugins/channelrx/remotesink/remotesinkbaseband.h index 5c585cdf3..af7de697e 100644 --- a/plugins/channelrx/remotesink/remotesinkbaseband.h +++ b/plugins/channelrx/remotesink/remotesinkbaseband.h @@ -62,8 +62,9 @@ public: void reset(); void feed(const SampleVector::const_iterator& begin, const SampleVector::const_iterator& end); - void startSender() { m_sink.startSender(); } - void stopSender() { m_sink.stopSender(); } + void startWork(); + void stopWork(); + bool isRunning() const { return m_running; } void setNbTxBytes(uint32_t nbTxBytes) { m_sink.setNbTxBytes(nbTxBytes); } MessageQueue *getInputMessageQueue() { return &m_inputMessageQueue; } //!< Get the queue for asynchronous inbound communication @@ -71,6 +72,7 @@ public: void setBasebandSampleRate(int sampleRate); private: + bool m_running; SampleSinkFifo m_sampleFifo; DownChannelizer *m_channelizer; int m_basebandSampleRate; diff --git a/plugins/channelrx/remotesink/remotesinksender.cpp b/plugins/channelrx/remotesink/remotesinksender.cpp index ed0e894d8..8ec91c1d5 100644 --- a/plugins/channelrx/remotesink/remotesinksender.cpp +++ b/plugins/channelrx/remotesink/remotesinksender.cpp @@ -21,11 +21,7 @@ // along with this program. If not, see . // /////////////////////////////////////////////////////////////////////////////////// - -#include -#include - -#include +#include #include "cm256cc/cm256.h" @@ -33,14 +29,24 @@ #include "remotesinksender.h" RemoteSinkSender::RemoteSinkSender() : + m_running(false), m_fifo(20, this), m_address(QHostAddress::LocalHost), - m_socket(nullptr) + m_socket(this) { qDebug("RemoteSinkSender::RemoteSinkSender"); m_cm256p = m_cm256.isInitialized() ? &m_cm256 : nullptr; - m_socket = new QUdpSocket(this); +} +RemoteSinkSender::~RemoteSinkSender() +{ + qDebug("RemoteSinkSender::~RemoteSinkSender"); + m_socket.close(); +} + +bool RemoteSinkSender::startWork() +{ + qDebug("RemoteSinkSender::startWork"); QObject::connect( &m_fifo, &RemoteSinkFifo::dataBlockServed, @@ -48,13 +54,31 @@ RemoteSinkSender::RemoteSinkSender() : &RemoteSinkSender::handleData, Qt::QueuedConnection ); + connect(thread(), SIGNAL(started()), this, SLOT(started())); + connect(thread(), SIGNAL(finished()), this, SLOT(finished())); + m_running = true; + return m_running; } -RemoteSinkSender::~RemoteSinkSender() +void RemoteSinkSender::started() { - qDebug("RemoteSinkSender::~RemoteSinkSender"); - m_socket->close(); - m_socket->deleteLater(); + disconnect(thread(), SIGNAL(started()), this, SLOT(started())); +} + +void RemoteSinkSender::stopWork() +{ + qDebug("RemoteSinkSender::stopWork"); +} + +void RemoteSinkSender::finished() +{ + // Close any existing connection + if (m_socket.isOpen()) { + m_socket.close(); + } + + m_running = false; + disconnect(thread(), SIGNAL(finished()), this, SLOT(finished())); } RemoteDataFrame *RemoteSinkSender::getDataFrame() @@ -91,11 +115,8 @@ void RemoteSinkSender::sendDataFrame(RemoteDataFrame *dataFrame) if ((nbBlocksFEC == 0) || !m_cm256p) // Do not FEC encode { - if (m_socket) - { - for (int i = 0; i < RemoteNbOrginalBlocks; i++) { // send block via UDP - m_socket->writeDatagram((const char*)&txBlockx[i], (qint64 ) RemoteUdpSize, m_address, dataPort); - } + for (int i = 0; i < RemoteNbOrginalBlocks; i++) { // send block via UDP + m_socket.writeDatagram((const char*)&txBlockx[i], (qint64 ) RemoteUdpSize, m_address, dataPort); } } else @@ -133,11 +154,8 @@ void RemoteSinkSender::sendDataFrame(RemoteDataFrame *dataFrame) } // Transmit all blocks - if (m_socket) - { - for (int i = 0; i < cm256Params.OriginalCount + cm256Params.RecoveryCount; i++) { // send block via UDP - m_socket->writeDatagram((const char*)&txBlockx[i], (qint64 ) RemoteUdpSize, m_address, dataPort); - } + for (int i = 0; i < cm256Params.OriginalCount + cm256Params.RecoveryCount; i++) { // send block via UDP + m_socket.writeDatagram((const char*)&txBlockx[i], (qint64 ) RemoteUdpSize, m_address, dataPort); } } diff --git a/plugins/channelrx/remotesink/remotesinksender.h b/plugins/channelrx/remotesink/remotesinksender.h index f73a3e411..a64c8ad57 100644 --- a/plugins/channelrx/remotesink/remotesinksender.h +++ b/plugins/channelrx/remotesink/remotesinksender.h @@ -28,6 +28,7 @@ #include #include #include +#include #include "cm256cc/cm256.h" @@ -47,19 +48,24 @@ public: RemoteSinkSender(); ~RemoteSinkSender(); + bool startWork(); + void stopWork(); RemoteDataFrame *getDataFrame(); private: + volatile bool m_running; RemoteSinkFifo m_fifo; CM256 m_cm256; CM256 *m_cm256p; QHostAddress m_address; - QUdpSocket *m_socket; + QUdpSocket m_socket; void sendDataFrame(RemoteDataFrame *dataFrame); private slots: + void started(); + void finished(); void handleData(); }; diff --git a/plugins/channelrx/remotesink/remotesinksink.cpp b/plugins/channelrx/remotesink/remotesinksink.cpp index f13b9bf13..eb638eab1 100644 --- a/plugins/channelrx/remotesink/remotesinksink.cpp +++ b/plugins/channelrx/remotesink/remotesinksink.cpp @@ -28,6 +28,8 @@ #include "remotesinksink.h" RemoteSinkSink::RemoteSinkSink() : + m_running(false), + m_remoteSinkSender(nullptr), m_txBlockIndex(0), m_frameCount(0), m_sampleIndex(0), @@ -41,32 +43,56 @@ RemoteSinkSink::RemoteSinkSink() : m_dataPort(9090) { qDebug("RemoteSinkSink::RemoteSinkSink"); - - m_senderThread = new QThread(this); - m_remoteSinkSender = new RemoteSinkSender(); - m_remoteSinkSender->moveToThread(m_senderThread); - applySettings(m_settings, true); } RemoteSinkSink::~RemoteSinkSink() { qDebug("RemoteSinkSink::~RemoteSinkSink"); - delete m_remoteSinkSender; - delete m_senderThread; + stop(); +} + +void RemoteSinkSink::start() +{ + qDebug("RemoteSinkSink::start"); + + if (m_running) { + stop(); + } + + m_remoteSinkSender = new RemoteSinkSender(); + m_remoteSinkSender->moveToThread(&m_senderThread); + startSender(); + m_running = true; +} + +void RemoteSinkSink::stop() +{ + qDebug("RemoteSinkSink::stop"); + + if (m_remoteSinkSender) + { + stopSender(); + m_remoteSinkSender->deleteLater(); + m_remoteSinkSender = nullptr; + } + + m_running = false; } void RemoteSinkSink::startSender() { qDebug("RemoteSinkSink::startSender"); - m_senderThread->start(); + m_remoteSinkSender->startWork(); + m_senderThread.start(); } void RemoteSinkSink::stopSender() { qDebug("RemoteSinkSink::stopSender"); - m_senderThread->exit(); - m_senderThread->wait(); + m_remoteSinkSender->stopWork(); + m_senderThread.quit(); + m_senderThread.wait(); } void RemoteSinkSink::init() diff --git a/plugins/channelrx/remotesink/remotesinksink.h b/plugins/channelrx/remotesink/remotesinksink.h index cfe391f12..34730daf1 100644 --- a/plugins/channelrx/remotesink/remotesinksink.h +++ b/plugins/channelrx/remotesink/remotesinksink.h @@ -19,6 +19,7 @@ #define INCLUDE_REMOTESINKSINK_H_ #include +#include #include "dsp/channelsamplesink.h" #include "channel/remotedatablock.h" @@ -28,7 +29,6 @@ class DeviceSampleSource; class RemoteSinkSender; -class QThread; class RemoteSinkSink : public QObject, public ChannelSampleSink { Q_OBJECT @@ -38,8 +38,8 @@ public: virtual void feed(const SampleVector::const_iterator& begin, const SampleVector::const_iterator& end); - void startSender(); - void stopSender(); + void start(); + void stop(); void init(); void setNbTxBytes(uint32_t nbTxBytes) { m_nbTxBytes = nbTxBytes; } @@ -49,7 +49,8 @@ public: private: RemoteSinkSettings m_settings; - QThread *m_senderThread; + bool m_running; + QThread m_senderThread; RemoteSinkSender *m_remoteSinkSender; int m_txBlockIndex; //!< Current index in blocks to transmit in the Tx row @@ -67,6 +68,8 @@ private: QString m_dataAddress; uint16_t m_dataPort; + void startSender(); + void stopSender(); void setNbBlocksFEC(int nbBlocksFEC); uint32_t getNbSampleBits();