From 4f462c1b886c17e1e802e10f03076dc6ba6b1915 Mon Sep 17 00:00:00 2001 From: f4exb Date: Sun, 12 Jul 2020 02:54:11 +0200 Subject: [PATCH] LocalSource: refactored Thread to Worker object moved to thread. Equivalent to FileInput changes --- plugins/channeltx/localsource/CMakeLists.txt | 4 +- .../localsource/localsourcesource.cpp | 38 +++++++--- .../channeltx/localsource/localsourcesource.h | 9 ++- ...sourcethread.cpp => localsourceworker.cpp} | 75 ++++--------------- ...ocalsourcethread.h => localsourceworker.h} | 45 +++-------- 5 files changed, 58 insertions(+), 113 deletions(-) rename plugins/channeltx/localsource/{localsourcethread.cpp => localsourceworker.cpp} (52%) rename plugins/channeltx/localsource/{localsourcethread.h => localsourceworker.h} (68%) diff --git a/plugins/channeltx/localsource/CMakeLists.txt b/plugins/channeltx/localsource/CMakeLists.txt index 7e25980de..ead523a10 100644 --- a/plugins/channeltx/localsource/CMakeLists.txt +++ b/plugins/channeltx/localsource/CMakeLists.txt @@ -4,7 +4,7 @@ set(localsource_SOURCES localsource.cpp localsourcebaseband.cpp localsourcesource.cpp - localsourcethread.cpp + localsourceworker.cpp localsourceplugin.cpp localsourcesettings.cpp localsourcewebapiadapter.cpp @@ -14,7 +14,7 @@ set(localsource_HEADERS localsource.h localsourcebaseband.h localsourcesource.h - localsourcethread.h + localsourceworker.h localsourceplugin.h localsourcesettings.h localsourcewebapiadapter.h diff --git a/plugins/channeltx/localsource/localsourcesource.cpp b/plugins/channeltx/localsource/localsourcesource.cpp index d721845fe..5bd507a35 100644 --- a/plugins/channeltx/localsource/localsourcesource.cpp +++ b/plugins/channeltx/localsource/localsourcesource.cpp @@ -16,12 +16,12 @@ /////////////////////////////////////////////////////////////////////////////////// #include "dsp/devicesamplesink.h" -#include "localsourcethread.h" +#include "localsourceworker.h" #include "localsourcesource.h" LocalSourceSource::LocalSourceSource() : m_running(false), - m_sinkThread(nullptr) + m_sinkWorker(nullptr) {} LocalSourceSource::~LocalSourceSource() @@ -50,7 +50,8 @@ void LocalSourceSource::start(DeviceSampleSink *deviceSink) return; } - m_sinkThread = new LocalSourceThread(); + m_sinkWorker = new LocalSourceWorker(); + m_sinkWorker->moveToThread(&m_sinkWorkerThread); if (deviceSink) { @@ -59,7 +60,7 @@ void LocalSourceSource::start(DeviceSampleSink *deviceSink) m_localSamples.resize(2*m_chunkSize); m_localSamplesIndex = 0; m_localSamplesIndexOffset = m_chunkSize; - m_sinkThread->setSampleFifo(m_localSampleSourceFifo); + m_sinkWorker->setSampleFifo(m_localSampleSourceFifo); } else { @@ -68,17 +69,17 @@ void LocalSourceSource::start(DeviceSampleSink *deviceSink) connect(this, SIGNAL(pullSamples(unsigned int)), - m_sinkThread, + m_sinkWorker, SLOT(pullSamples(unsigned int)), Qt::QueuedConnection); - connect(m_sinkThread, + connect(m_sinkWorker, SIGNAL(samplesAvailable(unsigned int, unsigned int, unsigned int, unsigned int)), this, SLOT(processSamples(unsigned int, unsigned int, unsigned int, unsigned int)), Qt::QueuedConnection); - m_sinkThread->startStop(true); + startWorker(); m_running = true; } @@ -86,16 +87,31 @@ void LocalSourceSource::stop() { qDebug("LocalSourceSource::stop"); - if (m_sinkThread) + if (m_sinkWorker) { - m_sinkThread->startStop(false); - m_sinkThread->deleteLater(); - m_sinkThread = nullptr; + stopWorker(); + m_sinkWorker->deleteLater(); + m_sinkWorker = nullptr; } m_running = false; } + +void LocalSourceSource::startWorker() +{ + m_sinkWorker->startWork(); + m_sinkWorkerThread.start(); +} + +void LocalSourceSource::stopWorker() +{ + m_sinkWorker->stopWork(); + m_sinkWorkerThread.quit(); + m_sinkWorkerThread.wait(); +} + + void LocalSourceSource::pullOne(Sample& sample) { if (m_localSampleSourceFifo) diff --git a/plugins/channeltx/localsource/localsourcesource.h b/plugins/channeltx/localsource/localsourcesource.h index a42440cdc..680a2da99 100644 --- a/plugins/channeltx/localsource/localsourcesource.h +++ b/plugins/channeltx/localsource/localsourcesource.h @@ -19,13 +19,14 @@ #define PLUGINS_CHANNELTX_LOCALSOURCE_LOCALSOURCESOURCE_H_ #include +#include #include "dsp/channelsamplesource.h" #include "localsourcesettings.h" class DeviceSampleSink; class SampleSourceFifo; -class LocalSourceThread; +class LocalSourceWorker; class LocalSourceSource : public QObject, public ChannelSampleSource { Q_OBJECT @@ -46,13 +47,17 @@ signals: private: bool m_running; - LocalSourceThread *m_sinkThread; + LocalSourceWorker *m_sinkWorker; + QThread m_sinkWorkerThread; SampleSourceFifo *m_localSampleSourceFifo; int m_chunkSize; SampleVector m_localSamples; int m_localSamplesIndex; int m_localSamplesIndexOffset; + void startWorker(); + void stopWorker(); + private slots: void processSamples(unsigned int iPart1Begin, unsigned int iPart1End, unsigned int iPart2Begin, unsigned int iPart2End); }; diff --git a/plugins/channeltx/localsource/localsourcethread.cpp b/plugins/channeltx/localsource/localsourceworker.cpp similarity index 52% rename from plugins/channeltx/localsource/localsourcethread.cpp rename to plugins/channeltx/localsource/localsourceworker.cpp index 195c85b51..06b0664cb 100644 --- a/plugins/channeltx/localsource/localsourcethread.cpp +++ b/plugins/channeltx/localsource/localsourceworker.cpp @@ -17,94 +17,45 @@ #include "dsp/samplesourcefifo.h" -#include "localsourcethread.h" +#include "localsourceworker.h" -MESSAGE_CLASS_DEFINITION(LocalSourceThread::MsgStartStop, Message) - -LocalSourceThread::LocalSourceThread(QObject* parent) : - QThread(parent), +LocalSourceWorker::LocalSourceWorker(QObject* parent) : + QObject(parent), m_running(false), m_sampleFifo(nullptr) { connect(&m_inputMessageQueue, SIGNAL(messageEnqueued()), this, SLOT(handleInputMessages()), Qt::QueuedConnection); } -LocalSourceThread::~LocalSourceThread() +LocalSourceWorker::~LocalSourceWorker() { - qDebug("LocalSourceThread::~LocalSourceThread"); + qDebug("LocalSourceWorker::~LocalSourceWorker"); } -void LocalSourceThread::startStop(bool start) -{ - MsgStartStop *msg = MsgStartStop::create(start); - m_inputMessageQueue.push(msg); -} - -void LocalSourceThread::setSampleFifo(SampleSourceFifo *sampleFifo) +void LocalSourceWorker::setSampleFifo(SampleSourceFifo *sampleFifo) { m_sampleFifo = sampleFifo; } -void LocalSourceThread::pullSamples(unsigned int count) +void LocalSourceWorker::pullSamples(unsigned int count) { unsigned int iPart1Begin, iPart1End, iPart2Begin, iPart2End; m_sampleFifo->read(count, iPart1Begin, iPart1End, iPart2Begin, iPart2End); emit samplesAvailable(iPart1Begin, iPart1End, iPart2Begin, iPart2End); } -void LocalSourceThread::startWork() +void LocalSourceWorker::startWork() { - qDebug("LocalSourceThread::startWork"); - m_startWaitMutex.lock(); - start(); - - while(!m_running) { - m_startWaiter.wait(&m_startWaitMutex, 100); - } - - m_startWaitMutex.unlock(); + qDebug("LocalSourceWorker::startWork"); + m_running = true; } -void LocalSourceThread::stopWork() +void LocalSourceWorker::stopWork() { - qDebug("LocalSourceThread::stopWork"); + qDebug("LocalSourceWorker::stopWork"); m_running = false; - wait(); } -void LocalSourceThread::run() +void LocalSourceWorker::handleInputMessages() { - qDebug("LocalSinkThread::run: begin"); - m_running = true; - m_startWaiter.wakeAll(); - - while (m_running) - { - sleep(1); // Do nothing as everything is in the data handler (dequeuer) - } - - m_running = false; - qDebug("LocalSinkThread::run: end"); -} - -void LocalSourceThread::handleInputMessages() -{ - Message* message; - - while ((message = m_inputMessageQueue.pop()) != 0) - { - if (MsgStartStop::match(*message)) - { - MsgStartStop* notif = (MsgStartStop*) message; - qDebug("LocalSourceThread::handleInputMessages: MsgStartStop: %s", notif->getStartStop() ? "start" : "stop"); - - if (notif->getStartStop()) { - startWork(); - } else { - stopWork(); - } - - delete message; - } - } } diff --git a/plugins/channeltx/localsource/localsourcethread.h b/plugins/channeltx/localsource/localsourceworker.h similarity index 68% rename from plugins/channeltx/localsource/localsourcethread.h rename to plugins/channeltx/localsource/localsourceworker.h index ff5a7b54e..5affce925 100644 --- a/plugins/channeltx/localsource/localsourcethread.h +++ b/plugins/channeltx/localsource/localsourceworker.h @@ -15,12 +15,10 @@ // along with this program. If not, see . // /////////////////////////////////////////////////////////////////////////////////// -#ifndef PLUGINS_CHANNELTX_LOCALOURCE_LOCALOURCETHREAD_H_ -#define PLUGINS_CHANNELTX_LOCALOURCE_LOCALOURCETHREAD_H_ +#ifndef PLUGINS_CHANNELTX_LOCALSOURCE_LOCALOURCEWORKER_H_ +#define PLUGINS_CHANNELTX_LOCALSOURCE_LOCALOURCEWORKER_H_ -#include -#include -#include +#include #include "dsp/dsptypes.h" #include "util/message.h" @@ -28,33 +26,15 @@ class SampleSourceFifo; -class LocalSourceThread : public QThread { +class LocalSourceWorker : public QObject { Q_OBJECT public: - class MsgStartStop : public Message { - MESSAGE_CLASS_DECLARATION + LocalSourceWorker(QObject* parent = nullptr); + ~LocalSourceWorker(); - public: - bool getStartStop() const { return m_startStop; } - - static MsgStartStop* create(bool startStop) { - return new MsgStartStop(startStop); - } - - protected: - bool m_startStop; - - MsgStartStop(bool startStop) : - Message(), - m_startStop(startStop) - { } - }; - - LocalSourceThread(QObject* parent = nullptr); - ~LocalSourceThread(); - - void startStop(bool start); + void startWork(); + void stopWork(); void setSampleFifo(SampleSourceFifo *sampleFifo); public slots: @@ -64,20 +44,13 @@ signals: void samplesAvailable(unsigned int iPart1Begin, unsigned int iPart1End, unsigned int iPart2Begin, unsigned int iPart2End); private: - QMutex m_startWaitMutex; - QWaitCondition m_startWaiter; volatile bool m_running; SampleSourceFifo *m_sampleFifo; - MessageQueue m_inputMessageQueue; - void startWork(); - void stopWork(); - void run(); - private slots: void handleInputMessages(); }; -#endif // PLUGINS_CHANNELTX_LOCALSINK_LOCALSINKTHREAD_H_ +#endif // PLUGINS_CHANNELTX_LOCALSOURCE_LOCALOURCEWORKER_H_