1
0
mirror of https://github.com/f4exb/sdrangel.git synced 2025-06-25 05:25:27 -04:00

File Input: refactored FileInputThread to FileInputWorker object moved to thread (contribution from Vort). Fixes #562

This commit is contained in:
f4exb 2020-07-11 06:45:16 +02:00
parent 252156ccbf
commit c75c35acad
5 changed files with 67 additions and 70 deletions

View File

@ -3,7 +3,7 @@ project(fileinput)
set(fileinput_SOURCES set(fileinput_SOURCES
fileinput.cpp fileinput.cpp
fileinputplugin.cpp fileinputplugin.cpp
fileinputthread.cpp fileinputworker.cpp
fileinputsettings.cpp fileinputsettings.cpp
fileinputwebapiadapter.cpp fileinputwebapiadapter.cpp
) )
@ -11,7 +11,7 @@ set(fileinput_SOURCES
set(fileinput_HEADERS set(fileinput_HEADERS
fileinput.h fileinput.h
fileinputplugin.h fileinputplugin.h
fileinputthread.h fileinputworker.h
fileinputsettings.h fileinputsettings.h
fileinputwebapiadapter.h fileinputwebapiadapter.h
) )

View File

@ -35,7 +35,7 @@
#include "device/deviceapi.h" #include "device/deviceapi.h"
#include "fileinput.h" #include "fileinput.h"
#include "fileinputthread.h" #include "fileinputworker.h"
MESSAGE_CLASS_DEFINITION(FileInput::MsgConfigureFileInput, Message) MESSAGE_CLASS_DEFINITION(FileInput::MsgConfigureFileInput, Message)
MESSAGE_CLASS_DEFINITION(FileInput::MsgConfigureFileSourceName, Message) MESSAGE_CLASS_DEFINITION(FileInput::MsgConfigureFileSourceName, Message)
@ -52,7 +52,7 @@ MESSAGE_CLASS_DEFINITION(FileInput::MsgReportHeaderCRC, Message)
FileInput::FileInput(DeviceAPI *deviceAPI) : FileInput::FileInput(DeviceAPI *deviceAPI) :
m_deviceAPI(deviceAPI), m_deviceAPI(deviceAPI),
m_settings(), m_settings(),
m_fileInputThread(nullptr), m_fileInputWorker(nullptr),
m_deviceDescription(), m_deviceDescription(),
m_fileName("..."), m_fileName("..."),
m_sampleRate(0), m_sampleRate(0),
@ -157,10 +157,10 @@ void FileInput::seekFileStream(int seekMillis)
{ {
QMutexLocker mutexLocker(&m_mutex); QMutexLocker mutexLocker(&m_mutex);
if ((m_ifstream.is_open()) && m_fileInputThread && !m_fileInputThread->isRunning()) if ((m_ifstream.is_open()) && m_fileInputWorker && !m_fileInputWorker->isRunning())
{ {
quint64 seekPoint = ((m_recordLength * seekMillis) / 1000) * m_sampleRate; quint64 seekPoint = ((m_recordLength * seekMillis) / 1000) * m_sampleRate;
m_fileInputThread->setSamplesCount(seekPoint); m_fileInputWorker->setSamplesCount(seekPoint);
seekPoint *= (m_sampleSize == 24 ? 8 : 4); // + sizeof(FileSink::Header) seekPoint *= (m_sampleSize == 24 ? 8 : 4); // + sizeof(FileSink::Header)
m_ifstream.clear(); m_ifstream.clear();
m_ifstream.seekg(seekPoint + sizeof(FileRecord::Header), std::ios::beg); m_ifstream.seekg(seekPoint + sizeof(FileRecord::Header), std::ios::beg);
@ -194,9 +194,11 @@ bool FileInput::start()
return false; return false;
} }
m_fileInputThread = new FileInputThread(&m_ifstream, &m_sampleFifo, m_masterTimer, &m_inputMessageQueue); m_fileInputWorker = new FileInputWorker(&m_ifstream, &m_sampleFifo, m_masterTimer, &m_inputMessageQueue);
m_fileInputThread->setSampleRateAndSize(m_settings.m_accelerationFactor * m_sampleRate, m_sampleSize); // Fast Forward: 1 corresponds to live. 1/2 is half speed, 2 is double speed m_fileInputWorker->moveToThread(&m_fileInputWorkerThread);
m_fileInputThread->startWork(); m_fileInputWorker->setSampleRateAndSize(m_settings.m_accelerationFactor * m_sampleRate, m_sampleSize); // Fast Forward: 1 corresponds to live. 1/2 is half speed, 2 is double speed
startWorker();
m_deviceDescription = "FileInput"; m_deviceDescription = "FileInput";
mutexLocker.unlock(); mutexLocker.unlock();
@ -215,11 +217,11 @@ void FileInput::stop()
qDebug() << "FileInput::stop"; qDebug() << "FileInput::stop";
QMutexLocker mutexLocker(&m_mutex); QMutexLocker mutexLocker(&m_mutex);
if (m_fileInputThread) if (m_fileInputWorker)
{ {
m_fileInputThread->stopWork(); stopWorker();
delete m_fileInputThread; delete m_fileInputWorker;
m_fileInputThread = nullptr; m_fileInputWorker = nullptr;
} }
m_deviceDescription.clear(); m_deviceDescription.clear();
@ -230,6 +232,20 @@ void FileInput::stop()
} }
} }
void FileInput::startWorker()
{
m_fileInputWorker->startWork();
m_fileInputWorkerThread.start();
}
void FileInput::stopWorker()
{
m_fileInputWorker->stopWork();
m_fileInputWorkerThread.quit();
m_fileInputWorkerThread.wait();
}
QByteArray FileInput::serialize() const QByteArray FileInput::serialize() const
{ {
return m_settings.serialize(); return m_settings.serialize();
@ -313,12 +329,12 @@ bool FileInput::handleMessage(const Message& message)
MsgConfigureFileInputWork& conf = (MsgConfigureFileInputWork&) message; MsgConfigureFileInputWork& conf = (MsgConfigureFileInputWork&) message;
bool working = conf.isWorking(); bool working = conf.isWorking();
if (m_fileInputThread != 0) if (m_fileInputWorker != 0)
{ {
if (working) { if (working) {
m_fileInputThread->startWork(); startWorker();
} else { } else {
m_fileInputThread->stopWork(); stopWorker();
} }
} }
@ -336,11 +352,11 @@ bool FileInput::handleMessage(const Message& message)
{ {
MsgReportFileInputStreamTiming *report; MsgReportFileInputStreamTiming *report;
if (m_fileInputThread != 0) if (m_fileInputWorker != 0)
{ {
if (getMessageQueueToGUI()) if (getMessageQueueToGUI())
{ {
report = MsgReportFileInputStreamTiming::create(m_fileInputThread->getSamplesCount()); report = MsgReportFileInputStreamTiming::create(m_fileInputWorker->getSamplesCount());
getMessageQueueToGUI()->push(report); getMessageQueueToGUI()->push(report);
} }
} }
@ -370,21 +386,21 @@ bool FileInput::handleMessage(const Message& message)
return true; return true;
} }
else if (FileInputThread::MsgReportEOF::match(message)) else if (FileInputWorker::MsgReportEOF::match(message))
{ {
qDebug() << "FileInput::handleMessage: MsgReportEOF"; qDebug() << "FileInput::handleMessage: MsgReportEOF";
m_fileInputThread->stopWork(); stopWorker();
if (getMessageQueueToGUI()) if (getMessageQueueToGUI())
{ {
MsgReportFileInputStreamTiming *report = MsgReportFileInputStreamTiming::create(m_fileInputThread->getSamplesCount()); MsgReportFileInputStreamTiming *report = MsgReportFileInputStreamTiming::create(m_fileInputWorker->getSamplesCount());
getMessageQueueToGUI()->push(report); getMessageQueueToGUI()->push(report);
} }
if (m_settings.m_loop) if (m_settings.m_loop)
{ {
seekFileStream(0); seekFileStream(0);
m_fileInputThread->startWork(); startWorker();
} }
else else
{ {
@ -415,14 +431,14 @@ bool FileInput::applySettings(const FileInputSettings& settings, bool force)
{ {
reverseAPIKeys.append("accelerationFactor"); reverseAPIKeys.append("accelerationFactor");
if (m_fileInputThread) if (m_fileInputWorker)
{ {
QMutexLocker mutexLocker(&m_mutex); QMutexLocker mutexLocker(&m_mutex);
if (!m_sampleFifo.setSize(m_settings.m_accelerationFactor * m_sampleRate * sizeof(Sample))) { if (!m_sampleFifo.setSize(m_settings.m_accelerationFactor * m_sampleRate * sizeof(Sample))) {
qCritical("FileInput::applySettings: could not reallocate sample FIFO size to %lu", qCritical("FileInput::applySettings: could not reallocate sample FIFO size to %lu",
m_settings.m_accelerationFactor * m_sampleRate * sizeof(Sample)); m_settings.m_accelerationFactor * m_sampleRate * sizeof(Sample));
} }
m_fileInputThread->setSampleRateAndSize(settings.m_accelerationFactor * m_sampleRate, m_sampleSize); // Fast Forward: 1 corresponds to live. 1/2 is half speed, 2 is double speed m_fileInputWorker->setSampleRateAndSize(settings.m_accelerationFactor * m_sampleRate, m_sampleSize); // Fast Forward: 1 corresponds to live. 1/2 is half speed, 2 is double speed
} }
} }
@ -571,8 +587,8 @@ void FileInput::webapiFormatDeviceReport(SWGSDRangel::SWGDeviceReport& response)
qint64 t_msec = 0; qint64 t_msec = 0;
quint64 samplesCount = 0; quint64 samplesCount = 0;
if (m_fileInputThread) { if (m_fileInputWorker) {
samplesCount = m_fileInputThread->getSamplesCount(); samplesCount = m_fileInputWorker->getSamplesCount();
} }
if (m_sampleRate > 0) if (m_sampleRate > 0)

View File

@ -25,6 +25,8 @@
#include <QString> #include <QString>
#include <QByteArray> #include <QByteArray>
#include <QTimer> #include <QTimer>
#include <QThread>
#include <QMutex>
#include <QNetworkRequest> #include <QNetworkRequest>
#include "dsp/devicesamplesource.h" #include "dsp/devicesamplesource.h"
@ -32,7 +34,7 @@
class QNetworkAccessManager; class QNetworkAccessManager;
class QNetworkReply; class QNetworkReply;
class FileInputThread; class FileInputWorker;
class DeviceAPI; class DeviceAPI;
class FileInput : public DeviceSampleSource { class FileInput : public DeviceSampleSource {
@ -333,7 +335,8 @@ public:
QMutex m_mutex; QMutex m_mutex;
FileInputSettings m_settings; FileInputSettings m_settings;
std::ifstream m_ifstream; std::ifstream m_ifstream;
FileInputThread* m_fileInputThread; FileInputWorker* m_fileInputWorker;
QThread m_fileInputWorkerThread;
QString m_deviceDescription; QString m_deviceDescription;
QString m_fileName; QString m_fileName;
int m_sampleRate; int m_sampleRate;
@ -345,6 +348,8 @@ public:
QNetworkAccessManager *m_networkManager; QNetworkAccessManager *m_networkManager;
QNetworkRequest m_networkRequest; QNetworkRequest m_networkRequest;
void startWorker();
void stopWorker();
void openFileStream(); void openFileStream();
void seekFileStream(int seekMillis); void seekFileStream(int seekMillis);
bool applySettings(const FileInputSettings& settings, bool force = false); bool applySettings(const FileInputSettings& settings, bool force = false);

View File

@ -21,18 +21,18 @@
#include <QDebug> #include <QDebug>
#include "dsp/filerecord.h" #include "dsp/filerecord.h"
#include "fileinputthread.h" #include "fileinputworker.h"
#include "dsp/samplesinkfifo.h" #include "dsp/samplesinkfifo.h"
#include "util/messagequeue.h" #include "util/messagequeue.h"
MESSAGE_CLASS_DEFINITION(FileInputThread::MsgReportEOF, Message) MESSAGE_CLASS_DEFINITION(FileInputWorker::MsgReportEOF, Message)
FileInputThread::FileInputThread(std::ifstream *samplesStream, FileInputWorker::FileInputWorker(std::ifstream *samplesStream,
SampleSinkFifo* sampleFifo, SampleSinkFifo* sampleFifo,
const QTimer& timer, const QTimer& timer,
MessageQueue *fileInputMessageQueue, MessageQueue *fileInputMessageQueue,
QObject* parent) : QObject* parent) :
QThread(parent), QObject(parent),
m_running(false), m_running(false),
m_ifstream(samplesStream), m_ifstream(samplesStream),
m_fileBuf(0), m_fileBuf(0),
@ -52,7 +52,7 @@ FileInputThread::FileInputThread(std::ifstream *samplesStream,
assert(m_ifstream != 0); assert(m_ifstream != 0);
} }
FileInputThread::~FileInputThread() FileInputWorker::~FileInputWorker()
{ {
if (m_running) { if (m_running) {
stopWork(); stopWork();
@ -67,20 +67,16 @@ FileInputThread::~FileInputThread()
} }
} }
void FileInputThread::startWork() void FileInputWorker::startWork()
{ {
qDebug() << "FileInputThread::startWork: "; qDebug() << "FileInputThread::startWork: ";
if (m_ifstream->is_open()) if (m_ifstream->is_open())
{ {
qDebug() << "FileInputThread::startWork: file stream open, starting..."; qDebug() << "FileInputThread::startWork: file stream open, starting...";
m_startWaitMutex.lock();
m_elapsedTimer.start(); m_elapsedTimer.start();
start();
while(!m_running)
m_startWaiter.wait(&m_startWaitMutex, 100);
m_startWaitMutex.unlock();
connect(&m_timer, SIGNAL(timeout()), this, SLOT(tick())); connect(&m_timer, SIGNAL(timeout()), this, SLOT(tick()));
m_running = true;
} }
else else
{ {
@ -88,15 +84,14 @@ void FileInputThread::startWork()
} }
} }
void FileInputThread::stopWork() void FileInputWorker::stopWork()
{ {
qDebug() << "FileInputThread::stopWork"; qDebug() << "FileInputThread::stopWork";
disconnect(&m_timer, SIGNAL(timeout()), this, SLOT(tick())); disconnect(&m_timer, SIGNAL(timeout()), this, SLOT(tick()));
m_running = false; m_running = false;
wait();
} }
void FileInputThread::setSampleRateAndSize(int samplerate, quint32 samplesize) void FileInputWorker::setSampleRateAndSize(int samplerate, quint32 samplesize)
{ {
qDebug() << "FileInputThread::setSampleRateAndSize:" qDebug() << "FileInputThread::setSampleRateAndSize:"
<< " new rate:" << samplerate << " new rate:" << samplerate
@ -121,7 +116,7 @@ void FileInputThread::setSampleRateAndSize(int samplerate, quint32 samplesize)
//m_samplerate = samplerate; //m_samplerate = samplerate;
} }
void FileInputThread::setBuffers(std::size_t chunksize) void FileInputWorker::setBuffers(std::size_t chunksize)
{ {
if (chunksize > m_bufsize) if (chunksize > m_bufsize)
{ {
@ -159,20 +154,7 @@ void FileInputThread::setBuffers(std::size_t chunksize)
} }
} }
void FileInputThread::run() void FileInputWorker::tick()
{
m_running = true;
m_startWaiter.wakeAll();
while(m_running) // actual work is in the tick() function
{
sleep(1);
}
m_running = false;
}
void FileInputThread::tick()
{ {
if (m_running) if (m_running)
{ {
@ -203,7 +185,7 @@ void FileInputThread::tick()
} }
} }
void FileInputThread::writeToSampleFifo(const quint8* buf, qint32 nbBytes) void FileInputWorker::writeToSampleFifo(const quint8* buf, qint32 nbBytes)
{ {
if (m_samplesize == 16) if (m_samplesize == 16)
{ {

View File

@ -15,12 +15,9 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>. // // along with this program. If not, see <http://www.gnu.org/licenses/>. //
/////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////
#ifndef INCLUDE_FILEINPUTTHREAD_H #ifndef INCLUDE_FILEINPUTWORKER_H
#define INCLUDE_FILEINPUTTHREAD_H #define INCLUDE_FILEINPUTWORKER_H
#include <QThread>
#include <QMutex>
#include <QWaitCondition>
#include <QTimer> #include <QTimer>
#include <QElapsedTimer> #include <QElapsedTimer>
#include <iostream> #include <iostream>
@ -35,7 +32,7 @@
class SampleSinkFifo; class SampleSinkFifo;
class MessageQueue; class MessageQueue;
class FileInputThread : public QThread { class FileInputWorker : public QObject {
Q_OBJECT Q_OBJECT
public: public:
@ -53,12 +50,12 @@ public:
{ } { }
}; };
FileInputThread(std::ifstream *samplesStream, FileInputWorker(std::ifstream *samplesStream,
SampleSinkFifo* sampleFifo, SampleSinkFifo* sampleFifo,
const QTimer& timer, const QTimer& timer,
MessageQueue *fileInputMessageQueue, MessageQueue *fileInputMessageQueue,
QObject* parent = NULL); QObject* parent = NULL);
~FileInputThread(); ~FileInputWorker();
void startWork(); void startWork();
void stopWork(); void stopWork();
@ -69,8 +66,6 @@ public:
void setSamplesCount(quint64 samplesCount) { m_samplesCount = samplesCount; } void setSamplesCount(quint64 samplesCount) { m_samplesCount = samplesCount; }
private: private:
QMutex m_startWaitMutex;
QWaitCondition m_startWaiter;
volatile bool m_running; volatile bool m_running;
std::ifstream* m_ifstream; std::ifstream* m_ifstream;
@ -90,7 +85,6 @@ private:
QElapsedTimer m_elapsedTimer; QElapsedTimer m_elapsedTimer;
bool m_throttleToggle; bool m_throttleToggle;
void run();
//void decimate1(SampleVector::iterator* it, const qint16* buf, qint32 len); //void decimate1(SampleVector::iterator* it, const qint16* buf, qint32 len);
void writeToSampleFifo(const quint8* buf, qint32 nbBytes); void writeToSampleFifo(const quint8* buf, qint32 nbBytes);
@ -98,4 +92,4 @@ private slots:
void tick(); void tick();
}; };
#endif // INCLUDE_FILEINPUTTHREAD_H #endif // INCLUDE_FILEINPUTWORKER_H