#1346: Update threading model in Star Tracker, PER Tester and APRS features

This commit is contained in:
Jon Beniston 2022-09-20 11:48:25 +01:00
parent 487f46c8ca
commit cd3c674ced
12 changed files with 109 additions and 148 deletions

View File

@ -44,12 +44,12 @@ const char* const APRS::m_featureIdURI = "sdrangel.feature.aprs";
const char* const APRS::m_featureId = "APRS";
APRS::APRS(WebAPIAdapterInterface *webAPIAdapterInterface) :
Feature(m_featureIdURI, webAPIAdapterInterface)
Feature(m_featureIdURI, webAPIAdapterInterface),
m_thread(nullptr),
m_worker(nullptr)
{
qDebug("APRS::APRS: webAPIAdapterInterface: %p", webAPIAdapterInterface);
setObjectName(m_featureId);
m_worker = new APRSWorker(this, webAPIAdapterInterface);
m_worker->moveToThread(&m_thread);
m_state = StIdle;
m_errorMessage = "APRS error";
m_networkManager = new QNetworkAccessManager();
@ -83,22 +83,24 @@ APRS::~APRS()
&APRS::networkManagerFinished
);
delete m_networkManager;
if (m_worker->isRunning()) {
stop();
}
delete m_worker;
stop();
}
void APRS::start()
{
qDebug("APRS::start");
m_worker->reset();
m_thread = new QThread(this);
m_worker = new APRSWorker(this, m_webAPIAdapterInterface);
m_worker->moveToThread(m_thread);
QObject::connect(m_thread, &QThread::started, m_worker, &APRSWorker::startWork);
QObject::connect(m_thread, &QThread::finished, m_worker, &QObject::deleteLater);
QObject::connect(m_thread, &QThread::finished, m_thread, &QThread::deleteLater);
m_worker->setMessageQueueToFeature(getInputMessageQueue());
m_worker->setMessageQueueToGUI(getMessageQueueToGUI());
bool ok = m_worker->startWork();
m_state = ok ? StIdle : StError;
m_thread.start();
m_thread->start();
m_state = StRunning;
APRSWorker::MsgConfigureAPRSWorker *msg = APRSWorker::MsgConfigureAPRSWorker::create(m_settings, true);
m_worker->getInputMessageQueue()->push(msg);
@ -107,10 +109,14 @@ void APRS::start()
void APRS::stop()
{
qDebug("APRS::stop");
m_worker->stopWork();
m_state = StIdle;
m_thread.quit();
m_thread.wait();
if (m_thread)
{
m_thread->quit();
m_thread->wait();
m_thread = nullptr;
m_worker = nullptr;
}
}
bool APRS::handleMessage(const Message& cmd)
@ -150,7 +156,7 @@ bool APRS::handleMessage(const Message& cmd)
MainCore::MsgPacket *copy = new MainCore::MsgPacket(report);
getMessageQueueToGUI()->push(copy);
}
if (m_state == StRunning)
if (m_worker)
{
MainCore::MsgPacket *copy = new MainCore::MsgPacket(report);
m_worker->getInputMessageQueue()->push(copy);
@ -219,7 +225,9 @@ void APRS::applySettings(const APRSSettings& settings, bool force)
APRSWorker::MsgConfigureAPRSWorker *msg = APRSWorker::MsgConfigureAPRSWorker::create(
settings, force
);
m_worker->getInputMessageQueue()->push(msg);
if (m_worker) {
m_worker->getInputMessageQueue()->push(msg);
}
if (settings.m_useReverseAPI)
{

View File

@ -153,7 +153,7 @@ public:
static const char* const m_featureId;
private:
QThread m_thread;
QThread *m_thread;
APRSWorker *m_worker;
APRSSettings m_settings;
QHash<ChannelAPI*, APRSSettings::AvailableChannel> m_availableChannels;

View File

@ -37,7 +37,6 @@ APRSWorker::APRSWorker(APRS *aprs, WebAPIAdapterInterface *webAPIAdapterInterfac
m_webAPIAdapterInterface(webAPIAdapterInterface),
m_msgQueueToFeature(nullptr),
m_msgQueueToGUI(nullptr),
m_running(false),
m_socket(this)
{
connect(&m_socket, SIGNAL(readyRead()),this, SLOT(recv()));
@ -55,42 +54,25 @@ APRSWorker::~APRSWorker()
m_inputMessageQueue.clear();
}
void APRSWorker::reset()
{
QMutexLocker mutexLocker(&m_mutex);
m_inputMessageQueue.clear();
}
bool APRSWorker::startWork()
void APRSWorker::startWork()
{
qDebug("APRSWorker::startWork");
QMutexLocker mutexLocker(&m_mutex);
connect(&m_inputMessageQueue, SIGNAL(messageEnqueued()), this, SLOT(handleInputMessages()));
connect(thread(), SIGNAL(started()), this, SLOT(started()));
connect(thread(), SIGNAL(finished()), this, SLOT(finished()));
m_running = true;
return m_running;
}
// startWork() is called from main thread. Timers need to be started on worker thread
void APRSWorker::started()
{
disconnect(thread(), SIGNAL(started()), this, SLOT(started()));
connect(thread(), SIGNAL(finished()), this, SLOT(stopWork()));
// Handle any messages already on the queue
handleInputMessages();
}
void APRSWorker::stopWork()
{
qDebug("APRSWorker::stopWork");
QMutexLocker mutexLocker(&m_mutex);
disconnect(&m_inputMessageQueue, SIGNAL(messageEnqueued()), this, SLOT(handleInputMessages()));
}
void APRSWorker::finished()
{
// Close any existing connection
if (m_socket.isOpen()) {
m_socket.close();
}
m_running = false;
disconnect(thread(), SIGNAL(finished()), this, SLOT(finished()));
disconnect(&m_inputMessageQueue, SIGNAL(messageEnqueued()), this, SLOT(handleInputMessages()));
}
void APRSWorker::handleInputMessages()

View File

@ -59,10 +59,8 @@ public:
APRSWorker(APRS *m_aprs, WebAPIAdapterInterface *webAPIAdapterInterface);
~APRSWorker();
void reset();
bool startWork();
void startWork();
void stopWork();
bool isRunning() const { return m_running; }
MessageQueue *getInputMessageQueue() { return &m_inputMessageQueue; }
void setMessageQueueToFeature(MessageQueue *messageQueue) { m_msgQueueToFeature = messageQueue; }
void setMessageQueueToGUI(MessageQueue *messageQueue) { m_msgQueueToGUI = messageQueue; }
@ -75,7 +73,6 @@ private:
MessageQueue *m_msgQueueToFeature; //!< Queue to report channel change to main feature object
MessageQueue *m_msgQueueToGUI;
APRSSettings m_settings;
bool m_running;
QRecursiveMutex m_mutex;
QTcpSocket m_socket;
bool m_loggedIn;
@ -86,8 +83,6 @@ private:
void send(const char *data, int length);
private slots:
void started();
void finished();
void handleInputMessages();
void connected();
void disconnected();

View File

@ -48,12 +48,12 @@ const char* const PERTester::m_featureIdURI = "sdrangel.feature.pertester";
const char* const PERTester::m_featureId = "PERTester";
PERTester::PERTester(WebAPIAdapterInterface *webAPIAdapterInterface) :
Feature(m_featureIdURI, webAPIAdapterInterface)
Feature(m_featureIdURI, webAPIAdapterInterface),
m_thread(nullptr),
m_worker(nullptr)
{
qDebug("PERTester::PERTester: webAPIAdapterInterface: %p", webAPIAdapterInterface);
setObjectName(m_featureId);
m_worker = new PERTesterWorker();
m_worker->moveToThread(&m_thread);
m_state = StIdle;
m_errorMessage = "PERTester error";
m_networkManager = new QNetworkAccessManager();
@ -74,40 +74,48 @@ PERTester::~PERTester()
&PERTester::networkManagerFinished
);
delete m_networkManager;
if (m_worker->isRunning()) {
stop();
}
delete m_worker;
stop();
}
void PERTester::start()
{
qDebug("PERTester::start");
m_thread = new QThread(this);
m_worker = new PERTesterWorker();
m_worker->moveToThread(m_thread);
QObject::connect(m_thread, &QThread::started, m_worker, &PERTesterWorker::startWork);
QObject::connect(m_thread, &QThread::finished, m_worker, &QObject::deleteLater);
QObject::connect(m_thread, &QThread::finished, m_thread, &QThread::deleteLater);
m_worker->setMessageQueueToFeature(getInputMessageQueue());
m_worker->setMessageQueueToGUI(getMessageQueueToGUI());
m_worker->getInputMessageQueue()->push(PERTesterWorker::MsgConfigurePERTesterWorker::create(m_settings, true));
if (m_settings.m_start == PERTesterSettings::START_IMMEDIATELY)
{
bool ok = m_worker->startWork();
m_state = ok ? StRunning : StError;
m_thread->start();
m_state = StRunning;
}
else
{
// Wait for AOS
m_state = StIdle;
}
m_thread.start();
m_thread->start();
}
void PERTester::stop()
{
qDebug("PERTester::stop");
m_worker->stopWork();
m_state = StIdle;
m_thread.quit();
m_thread.wait();
if (m_thread)
{
m_thread->quit();
m_thread->wait();
m_thread = nullptr;
m_worker = nullptr;
}
}
bool PERTester::handleMessage(const Message& cmd)
@ -135,7 +143,9 @@ bool PERTester::handleMessage(const Message& cmd)
}
else if (MsgResetStats::match(cmd))
{
m_worker->getInputMessageQueue()->push(MsgResetStats::create());
if (m_worker) {
m_worker->getInputMessageQueue()->push(MsgResetStats::create());
}
return true;
}
else if (MsgReportWorker::match(cmd))
@ -249,7 +259,9 @@ void PERTester::applySettings(const PERTesterSettings& settings, bool force)
PERTesterWorker::MsgConfigurePERTesterWorker *msg = PERTesterWorker::MsgConfigurePERTesterWorker::create(
settings, force
);
m_worker->getInputMessageQueue()->push(msg);
if (m_worker) {
m_worker->getInputMessageQueue()->push(msg);
}
if (settings.m_useReverseAPI)
{
@ -555,8 +567,8 @@ int PERTester::webapiActionsPost(
{
if (m_settings.m_start == PERTesterSettings::START_ON_AOS)
{
bool ok = m_worker->startWork();
m_state = ok ? StRunning : StError;
m_thread->start();
m_state = StRunning;
}
else if (m_settings.m_start == PERTesterSettings::START_ON_MID_PASS)
{
@ -566,8 +578,8 @@ int PERTester::webapiActionsPost(
QDateTime losTime = QDateTime::fromString(losTimeString);
qint64 msecs = aosTime.msecsTo(losTime) / 2;
QTimer::singleShot(msecs, [this] {
bool ok = m_worker->startWork();
m_state = ok ? StRunning : StError;
m_thread->start();
m_state = StRunning;
});
}
}

View File

@ -164,7 +164,7 @@ public:
static const char* const m_featureId;
private:
QThread m_thread;
QThread *m_thread;
PERTesterWorker *m_worker;
PERTesterSettings m_settings;

View File

@ -39,7 +39,6 @@ MESSAGE_CLASS_DEFINITION(PERTesterReport::MsgReportStats, Message)
PERTesterWorker::PERTesterWorker() :
m_msgQueueToFeature(nullptr),
m_msgQueueToGUI(nullptr),
m_running(false),
m_rxUDPSocket(nullptr),
m_txUDPSocket(this),
m_txTimer(this),
@ -57,46 +56,27 @@ PERTesterWorker::~PERTesterWorker()
m_inputMessageQueue.clear();
}
void PERTesterWorker::reset()
{
QMutexLocker mutexLocker(&m_mutex);
m_inputMessageQueue.clear();
}
bool PERTesterWorker::startWork()
void PERTesterWorker::startWork()
{
qDebug() << "PERTesterWorker::startWork";
QMutexLocker mutexLocker(&m_mutex);
connect(thread(), SIGNAL(started()), this, SLOT(started()));
connect(thread(), SIGNAL(finished()), this, SLOT(finished()));
m_running = true;
return m_running;
}
// startWork() is called from main thread. Timers/sockets need to be started on worker thread
void PERTesterWorker::started()
{
openUDP(m_settings);
// Automatically restart if previous run had finished, otherwise continue
if (m_tx >= m_settings.m_packetCount)
resetStats();
connect(&m_txTimer, SIGNAL(timeout()), this, SLOT(tx()));
connect(thread(), SIGNAL(finished()), this, SLOT(stopWork()));
m_txTimer.start(m_settings.m_interval * 1000.0);
disconnect(thread(), SIGNAL(started()), this, SLOT(started()));
// Handle any messages already on the queue
handleInputMessages();
}
void PERTesterWorker::stopWork()
{
qDebug() << "PERTesterWorker::stopWork";
}
void PERTesterWorker::finished()
{
m_txTimer.stop();
closeUDP();
disconnect(&m_txTimer, SIGNAL(timeout()), this, SLOT(tx()));
disconnect(thread(), SIGNAL(finished()), this, SLOT(finished()));
m_running = false;
}
void PERTesterWorker::handleInputMessages()

View File

@ -57,10 +57,8 @@ public:
PERTesterWorker();
~PERTesterWorker();
void reset();
bool startWork();
void startWork();
void stopWork();
bool isRunning() const { return m_running; }
MessageQueue *getInputMessageQueue() { return &m_inputMessageQueue; }
void setMessageQueueToFeature(MessageQueue *messageQueue) { m_msgQueueToFeature = messageQueue; }
void setMessageQueueToGUI(MessageQueue *messageQueue) { m_msgQueueToGUI = messageQueue; }
@ -71,7 +69,6 @@ private:
MessageQueue *m_msgQueueToFeature;
MessageQueue *m_msgQueueToGUI;
PERTesterSettings m_settings;
bool m_running;
QRecursiveMutex m_mutex;
QUdpSocket *m_rxUDPSocket; //!< UDP socket to receive packets on
QUdpSocket m_txUDPSocket;
@ -89,8 +86,6 @@ private:
void resetStats();
private slots:
void started();
void finished();
void handleInputMessages();
void rx();
void tx();

View File

@ -46,12 +46,12 @@ const char* const StarTracker::m_featureIdURI = "sdrangel.feature.startracker";
const char* const StarTracker::m_featureId = "StarTracker";
StarTracker::StarTracker(WebAPIAdapterInterface *webAPIAdapterInterface) :
Feature(m_featureIdURI, webAPIAdapterInterface)
Feature(m_featureIdURI, webAPIAdapterInterface),
m_thread(nullptr),
m_worker(nullptr)
{
qDebug("StarTracker::StarTracker: webAPIAdapterInterface: %p", webAPIAdapterInterface);
setObjectName(m_featureId);
m_worker = new StarTrackerWorker(this, webAPIAdapterInterface);
m_worker->moveToThread(&m_thread);
m_state = StIdle;
m_errorMessage = "StarTracker error";
m_networkManager = new QNetworkAccessManager();
@ -92,11 +92,8 @@ StarTracker::~StarTracker()
&StarTracker::networkManagerFinished
);
delete m_networkManager;
if (m_worker->isRunning()) {
stop();
}
stop();
delete m_worker;
if (m_weather)
{
disconnect(m_weather, &Weather::weatherUpdated, this, &StarTracker::weatherUpdated);
@ -110,12 +107,19 @@ void StarTracker::start()
{
qDebug("StarTracker::start");
m_worker->reset();
m_thread = new QThread(this);
m_worker = new StarTrackerWorker(this, m_webAPIAdapterInterface);
m_worker->moveToThread(m_thread);
QObject::connect(m_thread, &QThread::started, m_worker, &StarTrackerWorker::startWork);
QObject::connect(m_thread, &QThread::finished, m_worker, &QObject::deleteLater);
QObject::connect(m_thread, &QThread::finished, m_thread, &QThread::deleteLater);
m_worker->setMessageQueueToFeature(getInputMessageQueue());
m_worker->setMessageQueueToGUI(getMessageQueueToGUI());
bool ok = m_worker->startWork();
m_state = ok ? StRunning : StError;
m_thread.start();
m_thread->start();
m_thread->start();
m_state = StRunning;
m_worker->getInputMessageQueue()->push(StarTrackerWorker::MsgConfigureStarTrackerWorker::create(m_settings, true));
m_worker->getInputMessageQueue()->push(MsgSetSolarFlux::create(m_solarFlux));
@ -124,10 +128,14 @@ void StarTracker::start()
void StarTracker::stop()
{
qDebug("StarTracker::stop");
m_worker->stopWork();
m_state = StIdle;
m_thread.quit();
m_thread.wait();
if (m_thread)
{
m_thread->quit();
m_thread->wait();
m_thread = nullptr;
m_worker = nullptr;
}
}
bool StarTracker::handleMessage(const Message& cmd)
@ -157,7 +165,9 @@ bool StarTracker::handleMessage(const Message& cmd)
{
MsgSetSolarFlux& msg = (MsgSetSolarFlux&) cmd;
m_solarFlux = msg.getFlux();
m_worker->getInputMessageQueue()->push(new MsgSetSolarFlux(msg));
if (m_worker) {
m_worker->getInputMessageQueue()->push(new MsgSetSolarFlux(msg));
}
return true;
}
else if (MainCore::MsgStarTrackerDisplaySettings::match(cmd))
@ -338,7 +348,9 @@ void StarTracker::applySettings(const StarTrackerSettings& settings, bool force)
StarTrackerWorker::MsgConfigureStarTrackerWorker *msg = StarTrackerWorker::MsgConfigureStarTrackerWorker::create(
settings, force
);
m_worker->getInputMessageQueue()->push(msg);
if (m_worker) {
m_worker->getInputMessageQueue()->push(msg);
}
if (settings.m_useReverseAPI)
{
@ -741,7 +753,9 @@ void StarTracker::weatherUpdated(float temperature, float pressure, float humidi
m_settings.m_humidity = humidity;
}
m_worker->getInputMessageQueue()->push(StarTrackerWorker::MsgConfigureStarTrackerWorker::create(m_settings, false));
if (m_worker) {
m_worker->getInputMessageQueue()->push(StarTrackerWorker::MsgConfigureStarTrackerWorker::create(m_settings, false));
}
if (m_guiMessageQueue) {
m_guiMessageQueue->push(MsgConfigureStarTracker::create(m_settings, false));
}

View File

@ -155,7 +155,7 @@ public:
static const char* const m_featureId;
private:
QThread m_thread;
QThread *m_thread;
StarTrackerWorker *m_worker;
StarTrackerSettings m_settings;

View File

@ -50,7 +50,6 @@ StarTrackerWorker::StarTrackerWorker(StarTracker* starTracker, WebAPIAdapterInte
m_webAPIAdapterInterface(webAPIAdapterInterface),
m_msgQueueToFeature(nullptr),
m_msgQueueToGUI(nullptr),
m_running(false),
m_pollTimer(this),
m_tcpServer(nullptr),
m_clientConnection(nullptr),
@ -64,41 +63,22 @@ StarTrackerWorker::~StarTrackerWorker()
m_inputMessageQueue.clear();
}
void StarTrackerWorker::reset()
{
QMutexLocker mutexLocker(&m_mutex);
m_inputMessageQueue.clear();
}
bool StarTrackerWorker::startWork()
void StarTrackerWorker::startWork()
{
QMutexLocker mutexLocker(&m_mutex);
connect(&m_inputMessageQueue, SIGNAL(messageEnqueued()), this, SLOT(handleInputMessages()));
connect(thread(), SIGNAL(started()), this, SLOT(started()));
connect(thread(), SIGNAL(finished()), this, SLOT(finished()));
m_running = true;
return m_running;
}
// startWork() is called from main thread. Timers/sockets need to be started on worker thread
void StarTrackerWorker::started()
{
connect(thread(), SIGNAL(finished()), this, SLOT(stopWork()));
m_pollTimer.start((int)round(m_settings.m_updatePeriod*1000.0));
disconnect(thread(), SIGNAL(started()), this, SLOT(started()));
// Handle any messages already on the queue
handleInputMessages();
}
void StarTrackerWorker::stopWork()
{
QMutexLocker mutexLocker(&m_mutex);
disconnect(&m_inputMessageQueue, SIGNAL(messageEnqueued()), this, SLOT(handleInputMessages()));
}
void StarTrackerWorker::finished()
{
restartServer(false, 0);
m_pollTimer.stop();
disconnect(thread(), SIGNAL(finished()), this, SLOT(finished()));
m_running = false;
}
void StarTrackerWorker::handleInputMessages()

View File

@ -65,10 +65,8 @@ public:
StarTrackerWorker(StarTracker* starTracker, WebAPIAdapterInterface *webAPIAdapterInterface);
~StarTrackerWorker();
void reset();
bool startWork();
void startWork();
void stopWork();
bool isRunning() const { return m_running; }
MessageQueue *getInputMessageQueue() { return &m_inputMessageQueue; }
void setMessageQueueToFeature(MessageQueue *messageQueue) { m_msgQueueToFeature = messageQueue; }
void setMessageQueueToGUI(MessageQueue *messageQueue) { m_msgQueueToGUI = messageQueue; }
@ -81,7 +79,6 @@ private:
MessageQueue *m_msgQueueToFeature; //!< Queue to report channel change to main feature object
MessageQueue *m_msgQueueToGUI;
StarTrackerSettings m_settings;
bool m_running;
QRecursiveMutex m_mutex;
QTimer m_pollTimer;
QTcpServer *m_tcpServer;
@ -106,8 +103,6 @@ private:
);
private slots:
void started();
void finished();
void handleInputMessages();
void update();
void acceptConnection();