1
0
mirror of https://github.com/f4exb/sdrangel.git synced 2025-05-29 05:22:25 -04:00

LocalSink: refactored LocalSinkThread to LocalSinkWorker object moved to thread. Equivalent to FileInput changes

This commit is contained in:
f4exb 2020-07-11 10:37:33 +02:00
parent 7cddf7ce0b
commit 141d3fa03d
5 changed files with 59 additions and 66 deletions

View File

@ -6,7 +6,7 @@ set(localsink_SOURCES
localsinksink.cpp localsinksink.cpp
localsinksettings.cpp localsinksettings.cpp
localsinkwebapiadapter.cpp localsinkwebapiadapter.cpp
localsinkthread.cpp localsinkworker.cpp
localsinkplugin.cpp localsinkplugin.cpp
) )
@ -16,7 +16,7 @@ set(localsink_HEADERS
localsinksink.h localsinksink.h
localsinksettings.h localsinksettings.h
localsinkwebapiadapter.h localsinkwebapiadapter.h
localsinkthread.h localsinkworker.h
localsinkplugin.h localsinkplugin.h
) )

View File

@ -22,11 +22,11 @@
#include "dsp/devicesamplesource.h" #include "dsp/devicesamplesource.h"
#include "dsp/hbfilterchainconverter.h" #include "dsp/hbfilterchainconverter.h"
#include "localsinkthread.h" #include "localsinkworker.h"
#include "localsinksink.h" #include "localsinksink.h"
LocalSinkSink::LocalSinkSink() : LocalSinkSink::LocalSinkSink() :
m_sinkThread(nullptr), m_sinkWorker(nullptr),
m_running(false), m_running(false),
m_centerFrequency(0), m_centerFrequency(0),
m_frequencyOffset(0), m_frequencyOffset(0),
@ -54,22 +54,23 @@ void LocalSinkSink::start(DeviceSampleSource *deviceSource)
stop(); stop();
} }
m_sinkThread = new LocalSinkThread(); m_sinkWorker = new LocalSinkWorker();
m_sinkThread->setSampleFifo(&m_sampleFifo); m_sinkWorker->moveToThread(&m_sinkWorkerThread);
m_sinkWorker->setSampleFifo(&m_sampleFifo);
if (deviceSource) { if (deviceSource) {
m_sinkThread->setDeviceSampleFifo(deviceSource->getSampleFifo()); m_sinkWorker->setDeviceSampleFifo(deviceSource->getSampleFifo());
} }
QObject::connect( QObject::connect(
&m_sampleFifo, &m_sampleFifo,
&SampleSinkFifo::dataReady, &SampleSinkFifo::dataReady,
m_sinkThread, m_sinkWorker,
&LocalSinkThread::handleData, &LocalSinkWorker::handleData,
Qt::QueuedConnection Qt::QueuedConnection
); );
m_sinkThread->startStop(true); startWorker();
m_running = true; m_running = true;
} }
@ -80,20 +81,33 @@ void LocalSinkSink::stop()
QObject::disconnect( QObject::disconnect(
&m_sampleFifo, &m_sampleFifo,
&SampleSinkFifo::dataReady, &SampleSinkFifo::dataReady,
m_sinkThread, m_sinkWorker,
&LocalSinkThread::handleData &LocalSinkWorker::handleData
); );
if (m_sinkThread != 0) if (m_sinkWorker != 0)
{ {
m_sinkThread->startStop(false); stopWorker();
m_sinkThread->deleteLater(); m_sinkWorker->deleteLater();
m_sinkThread = 0; m_sinkWorker = nullptr;
} }
m_running = false; m_running = false;
} }
void LocalSinkSink::startWorker()
{
m_sinkWorker->startStop(true);
m_sinkWorkerThread.start();
}
void LocalSinkSink::stopWorker()
{
m_sinkWorker->startStop(false);
m_sinkWorkerThread.quit();
m_sinkWorkerThread.wait();
}
void LocalSinkSink::applySettings(const LocalSinkSettings& settings, bool force) void LocalSinkSink::applySettings(const LocalSinkSettings& settings, bool force)
{ {
qDebug() << "LocalSinkSink::applySettings:" qDebug() << "LocalSinkSink::applySettings:"

View File

@ -19,13 +19,14 @@
#define INCLUDE_LOCALSINKSINK_H_ #define INCLUDE_LOCALSINKSINK_H_
#include <QObject> #include <QObject>
#include <QThread>
#include "dsp/channelsamplesink.h" #include "dsp/channelsamplesink.h"
#include "localsinksettings.h" #include "localsinksettings.h"
class DeviceSampleSource; class DeviceSampleSource;
class LocalSinkThread; class LocalSinkWorker;
class LocalSinkSink : public QObject, public ChannelSampleSink { class LocalSinkSink : public QObject, public ChannelSampleSink {
Q_OBJECT Q_OBJECT
@ -44,7 +45,8 @@ public:
private: private:
SampleSinkFifo m_sampleFifo; SampleSinkFifo m_sampleFifo;
LocalSinkSettings m_settings; LocalSinkSettings m_settings;
LocalSinkThread *m_sinkThread; LocalSinkWorker *m_sinkWorker;
QThread m_sinkWorkerThread;
bool m_running; bool m_running;
uint64_t m_centerFrequency; uint64_t m_centerFrequency;
@ -52,6 +54,8 @@ private:
uint32_t m_sampleRate; uint32_t m_sampleRate;
uint32_t m_deviceSampleRate; uint32_t m_deviceSampleRate;
void startWorker();
void stopWorker();
}; };
#endif // INCLUDE_LOCALSINKSINK_H_ #endif // INCLUDE_LOCALSINKSINK_H_

View File

@ -17,62 +17,41 @@
#include "dsp/samplesinkfifo.h" #include "dsp/samplesinkfifo.h"
#include "localsinkthread.h" #include "localsinkworker.h"
MESSAGE_CLASS_DEFINITION(LocalSinkThread::MsgStartStop, Message) MESSAGE_CLASS_DEFINITION(LocalSinkWorker::MsgStartStop, Message)
LocalSinkThread::LocalSinkThread(QObject* parent) : LocalSinkWorker::LocalSinkWorker(QObject* parent) :
QThread(parent), QObject(parent),
m_running(false), m_running(false),
m_sampleFifo(0) m_sampleFifo(0)
{ {
connect(&m_inputMessageQueue, SIGNAL(messageEnqueued()), this, SLOT(handleInputMessages()), Qt::QueuedConnection); connect(&m_inputMessageQueue, SIGNAL(messageEnqueued()), this, SLOT(handleInputMessages()), Qt::QueuedConnection);
} }
LocalSinkThread::~LocalSinkThread() LocalSinkWorker::~LocalSinkWorker()
{ {
qDebug("LocalSinkThread::~LocalSinkThread"); qDebug("LocalSinkWorker::~LocalSinkWorker");
} }
void LocalSinkThread::startStop(bool start) void LocalSinkWorker::startStop(bool start)
{ {
MsgStartStop *msg = MsgStartStop::create(start); MsgStartStop *msg = MsgStartStop::create(start);
m_inputMessageQueue.push(msg); m_inputMessageQueue.push(msg);
} }
void LocalSinkThread::startWork() void LocalSinkWorker::startWork()
{ {
qDebug("LocalSinkThread::startWork"); qDebug("LocalSinkWorker::startWork");
m_startWaitMutex.lock(); m_running = true;
start();
while(!m_running)
m_startWaiter.wait(&m_startWaitMutex, 100);
m_startWaitMutex.unlock();
} }
void LocalSinkThread::stopWork() void LocalSinkWorker::stopWork()
{ {
qDebug("LocalSinkThread::stopWork");
m_running = false; m_running = false;
wait();
} }
void LocalSinkThread::run() void LocalSinkWorker::handleData()
{
qDebug("LocalSinkThread::run: begin");
m_running = true;
m_startWaiter.wakeAll();
while (m_running)
{
sleep(1); // Do nothing as everything is in the data handler (dequeuer)
}
m_running = false;
qDebug("LocalSinkThread::run: end");
}
void LocalSinkThread::handleData()
{ {
while ((m_sampleFifo->fill() > 0) && (m_inputMessageQueue.size() == 0)) while ((m_sampleFifo->fill() > 0) && (m_inputMessageQueue.size() == 0))
{ {
@ -100,7 +79,7 @@ void LocalSinkThread::handleData()
} }
} }
void LocalSinkThread::handleInputMessages() void LocalSinkWorker::handleInputMessages()
{ {
Message* message; Message* message;
@ -109,7 +88,7 @@ void LocalSinkThread::handleInputMessages()
if (MsgStartStop::match(*message)) if (MsgStartStop::match(*message))
{ {
MsgStartStop* notif = (MsgStartStop*) message; MsgStartStop* notif = (MsgStartStop*) message;
qDebug("LocalSinkThread::handleInputMessages: MsgStartStop: %s", notif->getStartStop() ? "start" : "stop"); qDebug("LocalSinkWorker::handleInputMessages: MsgStartStop: %s", notif->getStartStop() ? "start" : "stop");
if (notif->getStartStop()) { if (notif->getStartStop()) {
startWork(); startWork();

View File

@ -15,10 +15,10 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>. // // along with this program. If not, see <http://www.gnu.org/licenses/>. //
/////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////
#ifndef PLUGINS_CHANNELRX_LOCALSINK_LOCALSINKTHREAD_H_ #ifndef PLUGINS_CHANNELRX_LOCALSINK_LOCALSINKWORKER_H_
#define PLUGINS_CHANNELRX_LOCALSINK_LOCALSINKTHREAD_H_ #define PLUGINS_CHANNELRX_LOCALSINK_LOCALSINKWORKER_H_
#include <QThread> #include <QObject>
#include <QMutex> #include <QMutex>
#include <QWaitCondition> #include <QWaitCondition>
@ -28,7 +28,7 @@
class SampleSinkFifo; class SampleSinkFifo;
class LocalSinkThread : public QThread { class LocalSinkWorker : public QObject {
Q_OBJECT Q_OBJECT
public: public:
@ -51,8 +51,8 @@ public:
{ } { }
}; };
LocalSinkThread(QObject* parent = 0); LocalSinkWorker(QObject* parent = 0);
~LocalSinkThread(); ~LocalSinkWorker();
void startStop(bool start); void startStop(bool start);
void setSampleFifo(SampleSinkFifo *sampleFifo) { m_sampleFifo = sampleFifo; } void setSampleFifo(SampleSinkFifo *sampleFifo) { m_sampleFifo = sampleFifo; }
@ -62,21 +62,17 @@ public slots:
void handleData(); //!< Handle data when samples have to be processed void handleData(); //!< Handle data when samples have to be processed
private: private:
QMutex m_startWaitMutex;
QWaitCondition m_startWaiter;
volatile bool m_running; volatile bool m_running;
SampleSinkFifo *m_sampleFifo; SampleSinkFifo *m_sampleFifo;
SampleSinkFifo *m_deviceSampleFifo; SampleSinkFifo *m_deviceSampleFifo;
MessageQueue m_inputMessageQueue; MessageQueue m_inputMessageQueue;
void startWork(); void startWork();
void stopWork(); void stopWork();
void run();
private slots: private slots:
void handleInputMessages(); void handleInputMessages();
}; };
#endif // PLUGINS_CHANNELRX_LOCALSINK_LOCALSINKTHREAD_H_ #endif // PLUGINS_CHANNELRX_LOCALSINK_LOCALSINKWORKER_H_