1
0
mirror of https://github.com/f4exb/sdrangel.git synced 2024-11-17 13:51:47 -05:00

LocalSource: refactored Thread to Worker object moved to thread. Equivalent to FileInput changes

This commit is contained in:
f4exb 2020-07-12 02:54:11 +02:00
parent b8681d59a9
commit 4f462c1b88
5 changed files with 58 additions and 113 deletions

View File

@ -4,7 +4,7 @@ set(localsource_SOURCES
localsource.cpp localsource.cpp
localsourcebaseband.cpp localsourcebaseband.cpp
localsourcesource.cpp localsourcesource.cpp
localsourcethread.cpp localsourceworker.cpp
localsourceplugin.cpp localsourceplugin.cpp
localsourcesettings.cpp localsourcesettings.cpp
localsourcewebapiadapter.cpp localsourcewebapiadapter.cpp
@ -14,7 +14,7 @@ set(localsource_HEADERS
localsource.h localsource.h
localsourcebaseband.h localsourcebaseband.h
localsourcesource.h localsourcesource.h
localsourcethread.h localsourceworker.h
localsourceplugin.h localsourceplugin.h
localsourcesettings.h localsourcesettings.h
localsourcewebapiadapter.h localsourcewebapiadapter.h

View File

@ -16,12 +16,12 @@
/////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////
#include "dsp/devicesamplesink.h" #include "dsp/devicesamplesink.h"
#include "localsourcethread.h" #include "localsourceworker.h"
#include "localsourcesource.h" #include "localsourcesource.h"
LocalSourceSource::LocalSourceSource() : LocalSourceSource::LocalSourceSource() :
m_running(false), m_running(false),
m_sinkThread(nullptr) m_sinkWorker(nullptr)
{} {}
LocalSourceSource::~LocalSourceSource() LocalSourceSource::~LocalSourceSource()
@ -50,7 +50,8 @@ void LocalSourceSource::start(DeviceSampleSink *deviceSink)
return; return;
} }
m_sinkThread = new LocalSourceThread(); m_sinkWorker = new LocalSourceWorker();
m_sinkWorker->moveToThread(&m_sinkWorkerThread);
if (deviceSink) if (deviceSink)
{ {
@ -59,7 +60,7 @@ void LocalSourceSource::start(DeviceSampleSink *deviceSink)
m_localSamples.resize(2*m_chunkSize); m_localSamples.resize(2*m_chunkSize);
m_localSamplesIndex = 0; m_localSamplesIndex = 0;
m_localSamplesIndexOffset = m_chunkSize; m_localSamplesIndexOffset = m_chunkSize;
m_sinkThread->setSampleFifo(m_localSampleSourceFifo); m_sinkWorker->setSampleFifo(m_localSampleSourceFifo);
} }
else else
{ {
@ -68,17 +69,17 @@ void LocalSourceSource::start(DeviceSampleSink *deviceSink)
connect(this, connect(this,
SIGNAL(pullSamples(unsigned int)), SIGNAL(pullSamples(unsigned int)),
m_sinkThread, m_sinkWorker,
SLOT(pullSamples(unsigned int)), SLOT(pullSamples(unsigned int)),
Qt::QueuedConnection); Qt::QueuedConnection);
connect(m_sinkThread, connect(m_sinkWorker,
SIGNAL(samplesAvailable(unsigned int, unsigned int, unsigned int, unsigned int)), SIGNAL(samplesAvailable(unsigned int, unsigned int, unsigned int, unsigned int)),
this, this,
SLOT(processSamples(unsigned int, unsigned int, unsigned int, unsigned int)), SLOT(processSamples(unsigned int, unsigned int, unsigned int, unsigned int)),
Qt::QueuedConnection); Qt::QueuedConnection);
m_sinkThread->startStop(true); startWorker();
m_running = true; m_running = true;
} }
@ -86,16 +87,31 @@ void LocalSourceSource::stop()
{ {
qDebug("LocalSourceSource::stop"); qDebug("LocalSourceSource::stop");
if (m_sinkThread) if (m_sinkWorker)
{ {
m_sinkThread->startStop(false); stopWorker();
m_sinkThread->deleteLater(); m_sinkWorker->deleteLater();
m_sinkThread = nullptr; m_sinkWorker = nullptr;
} }
m_running = false; m_running = false;
} }
void LocalSourceSource::startWorker()
{
m_sinkWorker->startWork();
m_sinkWorkerThread.start();
}
void LocalSourceSource::stopWorker()
{
m_sinkWorker->stopWork();
m_sinkWorkerThread.quit();
m_sinkWorkerThread.wait();
}
void LocalSourceSource::pullOne(Sample& sample) void LocalSourceSource::pullOne(Sample& sample)
{ {
if (m_localSampleSourceFifo) if (m_localSampleSourceFifo)

View File

@ -19,13 +19,14 @@
#define PLUGINS_CHANNELTX_LOCALSOURCE_LOCALSOURCESOURCE_H_ #define PLUGINS_CHANNELTX_LOCALSOURCE_LOCALSOURCESOURCE_H_
#include <QObject> #include <QObject>
#include <QThread>
#include "dsp/channelsamplesource.h" #include "dsp/channelsamplesource.h"
#include "localsourcesettings.h" #include "localsourcesettings.h"
class DeviceSampleSink; class DeviceSampleSink;
class SampleSourceFifo; class SampleSourceFifo;
class LocalSourceThread; class LocalSourceWorker;
class LocalSourceSource : public QObject, public ChannelSampleSource { class LocalSourceSource : public QObject, public ChannelSampleSource {
Q_OBJECT Q_OBJECT
@ -46,13 +47,17 @@ signals:
private: private:
bool m_running; bool m_running;
LocalSourceThread *m_sinkThread; LocalSourceWorker *m_sinkWorker;
QThread m_sinkWorkerThread;
SampleSourceFifo *m_localSampleSourceFifo; SampleSourceFifo *m_localSampleSourceFifo;
int m_chunkSize; int m_chunkSize;
SampleVector m_localSamples; SampleVector m_localSamples;
int m_localSamplesIndex; int m_localSamplesIndex;
int m_localSamplesIndexOffset; int m_localSamplesIndexOffset;
void startWorker();
void stopWorker();
private slots: private slots:
void processSamples(unsigned int iPart1Begin, unsigned int iPart1End, unsigned int iPart2Begin, unsigned int iPart2End); void processSamples(unsigned int iPart1Begin, unsigned int iPart1End, unsigned int iPart2Begin, unsigned int iPart2End);
}; };

View File

@ -17,94 +17,45 @@
#include "dsp/samplesourcefifo.h" #include "dsp/samplesourcefifo.h"
#include "localsourcethread.h" #include "localsourceworker.h"
MESSAGE_CLASS_DEFINITION(LocalSourceThread::MsgStartStop, Message) LocalSourceWorker::LocalSourceWorker(QObject* parent) :
QObject(parent),
LocalSourceThread::LocalSourceThread(QObject* parent) :
QThread(parent),
m_running(false), m_running(false),
m_sampleFifo(nullptr) m_sampleFifo(nullptr)
{ {
connect(&m_inputMessageQueue, SIGNAL(messageEnqueued()), this, SLOT(handleInputMessages()), Qt::QueuedConnection); connect(&m_inputMessageQueue, SIGNAL(messageEnqueued()), this, SLOT(handleInputMessages()), Qt::QueuedConnection);
} }
LocalSourceThread::~LocalSourceThread() LocalSourceWorker::~LocalSourceWorker()
{ {
qDebug("LocalSourceThread::~LocalSourceThread"); qDebug("LocalSourceWorker::~LocalSourceWorker");
} }
void LocalSourceThread::startStop(bool start) void LocalSourceWorker::setSampleFifo(SampleSourceFifo *sampleFifo)
{
MsgStartStop *msg = MsgStartStop::create(start);
m_inputMessageQueue.push(msg);
}
void LocalSourceThread::setSampleFifo(SampleSourceFifo *sampleFifo)
{ {
m_sampleFifo = sampleFifo; m_sampleFifo = sampleFifo;
} }
void LocalSourceThread::pullSamples(unsigned int count) void LocalSourceWorker::pullSamples(unsigned int count)
{ {
unsigned int iPart1Begin, iPart1End, iPart2Begin, iPart2End; unsigned int iPart1Begin, iPart1End, iPart2Begin, iPart2End;
m_sampleFifo->read(count, iPart1Begin, iPart1End, iPart2Begin, iPart2End); m_sampleFifo->read(count, iPart1Begin, iPart1End, iPart2Begin, iPart2End);
emit samplesAvailable(iPart1Begin, iPart1End, iPart2Begin, iPart2End); emit samplesAvailable(iPart1Begin, iPart1End, iPart2Begin, iPart2End);
} }
void LocalSourceThread::startWork() void LocalSourceWorker::startWork()
{ {
qDebug("LocalSourceThread::startWork"); qDebug("LocalSourceWorker::startWork");
m_startWaitMutex.lock(); m_running = true;
start();
while(!m_running) {
m_startWaiter.wait(&m_startWaitMutex, 100);
}
m_startWaitMutex.unlock();
} }
void LocalSourceThread::stopWork() void LocalSourceWorker::stopWork()
{ {
qDebug("LocalSourceThread::stopWork"); qDebug("LocalSourceWorker::stopWork");
m_running = false; m_running = false;
wait();
} }
void LocalSourceThread::run() void LocalSourceWorker::handleInputMessages()
{ {
qDebug("LocalSinkThread::run: begin");
m_running = true;
m_startWaiter.wakeAll();
while (m_running)
{
sleep(1); // Do nothing as everything is in the data handler (dequeuer)
}
m_running = false;
qDebug("LocalSinkThread::run: end");
}
void LocalSourceThread::handleInputMessages()
{
Message* message;
while ((message = m_inputMessageQueue.pop()) != 0)
{
if (MsgStartStop::match(*message))
{
MsgStartStop* notif = (MsgStartStop*) message;
qDebug("LocalSourceThread::handleInputMessages: MsgStartStop: %s", notif->getStartStop() ? "start" : "stop");
if (notif->getStartStop()) {
startWork();
} else {
stopWork();
}
delete message;
}
}
} }

View File

@ -15,12 +15,10 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>. // // along with this program. If not, see <http://www.gnu.org/licenses/>. //
/////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////
#ifndef PLUGINS_CHANNELTX_LOCALOURCE_LOCALOURCETHREAD_H_ #ifndef PLUGINS_CHANNELTX_LOCALSOURCE_LOCALOURCEWORKER_H_
#define PLUGINS_CHANNELTX_LOCALOURCE_LOCALOURCETHREAD_H_ #define PLUGINS_CHANNELTX_LOCALSOURCE_LOCALOURCEWORKER_H_
#include <QThread> #include <QObject>
#include <QMutex>
#include <QWaitCondition>
#include "dsp/dsptypes.h" #include "dsp/dsptypes.h"
#include "util/message.h" #include "util/message.h"
@ -28,33 +26,15 @@
class SampleSourceFifo; class SampleSourceFifo;
class LocalSourceThread : public QThread { class LocalSourceWorker : public QObject {
Q_OBJECT Q_OBJECT
public: public:
class MsgStartStop : public Message { LocalSourceWorker(QObject* parent = nullptr);
MESSAGE_CLASS_DECLARATION ~LocalSourceWorker();
public: void startWork();
bool getStartStop() const { return m_startStop; } void stopWork();
static MsgStartStop* create(bool startStop) {
return new MsgStartStop(startStop);
}
protected:
bool m_startStop;
MsgStartStop(bool startStop) :
Message(),
m_startStop(startStop)
{ }
};
LocalSourceThread(QObject* parent = nullptr);
~LocalSourceThread();
void startStop(bool start);
void setSampleFifo(SampleSourceFifo *sampleFifo); void setSampleFifo(SampleSourceFifo *sampleFifo);
public slots: public slots:
@ -64,20 +44,13 @@ signals:
void samplesAvailable(unsigned int iPart1Begin, unsigned int iPart1End, unsigned int iPart2Begin, unsigned int iPart2End); void samplesAvailable(unsigned int iPart1Begin, unsigned int iPart1End, unsigned int iPart2Begin, unsigned int iPart2End);
private: private:
QMutex m_startWaitMutex;
QWaitCondition m_startWaiter;
volatile bool m_running; volatile bool m_running;
SampleSourceFifo *m_sampleFifo; SampleSourceFifo *m_sampleFifo;
MessageQueue m_inputMessageQueue; MessageQueue m_inputMessageQueue;
void startWork();
void stopWork();
void run();
private slots: private slots:
void handleInputMessages(); void handleInputMessages();
}; };
#endif // PLUGINS_CHANNELTX_LOCALSINK_LOCALSINKTHREAD_H_ #endif // PLUGINS_CHANNELTX_LOCALSOURCE_LOCALOURCEWORKER_H_