APRS: implemented new message pipes. Part of #1154

This commit is contained in:
f4exb 2022-03-26 09:53:06 +01:00
parent 7689a16109
commit b1543c2b98
9 changed files with 195 additions and 56 deletions

View File

@ -227,16 +227,14 @@ bool ChirpChatDemod::handleMessage(const Message& cmd)
}
// Forward to APRS and other packet features
MessagePipesLegacy& messagePipes = MainCore::instance()->getMessagePipesLegacy();
QList<MessageQueue*> *packetMessageQueues = messagePipes.getMessageQueues(this, "packets");
if (packetMessageQueues)
QList<ObjectPipe*> packetsPipes;
MainCore::instance()->getMessagePipes().getMessagePipes(this, "packets", packetsPipes);
for (const auto& pipe : packetsPipes)
{
QList<MessageQueue*>::iterator it = packetMessageQueues->begin();
for (; it != packetMessageQueues->end(); ++it)
{
MainCore::MsgPacket *msg = MainCore::MsgPacket::create(this, packet, QDateTime::currentDateTime());
(*it)->push(msg);
}
MessageQueue *messageQueue = qobject_cast<MessageQueue*>(pipe->m_element);
MainCore::MsgPacket *msg = MainCore::MsgPacket::create(this, packet, QDateTime::currentDateTime());
messageQueue->push(msg);
}
}

View File

@ -171,18 +171,15 @@ bool PacketDemod::handleMessage(const Message& cmd)
getMessageQueueToGUI()->push(msg);
}
MessagePipesLegacy& messagePipes = MainCore::instance()->getMessagePipesLegacy();
// Forward to APRS and other packet features
QList<MessageQueue*> *packetMessageQueues = messagePipes.getMessageQueues(this, "packets");
if (packetMessageQueues)
QList<ObjectPipe*> packetsPipes;
MainCore::instance()->getMessagePipes().getMessagePipes(this, "packets", packetsPipes);
for (const auto& pipe : packetsPipes)
{
QList<MessageQueue*>::iterator it = packetMessageQueues->begin();
for (; it != packetMessageQueues->end(); ++it)
{
MainCore::MsgPacket *msg = new MainCore::MsgPacket(report);
(*it)->push(msg);
}
MessageQueue *messageQueue = qobject_cast<MessageQueue*>(pipe->m_element);
MainCore::MsgPacket *msg = new MainCore::MsgPacket(report);
messageQueue->push(msg);
}
// Forward via UDP

View File

@ -28,7 +28,6 @@
#include "feature/featuregui.h"
#include "util/messagequeue.h"
#include "util/ais.h"
#include "pipes/pipeendpoint.h"
#include "settings/rollupstate.h"
#include "aissettings.h"

View File

@ -37,6 +37,8 @@
MESSAGE_CLASS_DEFINITION(APRS::MsgConfigureAPRS, Message)
MESSAGE_CLASS_DEFINITION(APRS::MsgReportWorker, Message)
MESSAGE_CLASS_DEFINITION(APRS::MsgQueryAvailableChannels, Message)
MESSAGE_CLASS_DEFINITION(APRS::MsgReportAvailableChannels, Message)
const char* const APRS::m_featureIdURI = "sdrangel.feature.aprs";
const char* const APRS::m_featureId = "APRS";
@ -50,8 +52,6 @@ APRS::APRS(WebAPIAdapterInterface *webAPIAdapterInterface) :
m_worker->moveToThread(&m_thread);
m_state = StIdle;
m_errorMessage = "APRS error";
connect(&m_updatePipesTimer, SIGNAL(timeout()), this, SLOT(updatePipes()));
m_updatePipesTimer.start(1000);
m_networkManager = new QNetworkAccessManager();
QObject::connect(
m_networkManager,
@ -59,10 +59,13 @@ APRS::APRS(WebAPIAdapterInterface *webAPIAdapterInterface) :
this,
&APRS::networkManagerFinished
);
scanAvailableChannels();
connect(MainCore::instance(), SIGNAL(channelAdded(int, ChannelAPI*)), this, SLOT(handleChannelAdded(int, ChannelAPI*)));
}
APRS::~APRS()
{
disconnect(MainCore::instance(), SIGNAL(channelAdded(int, ChannelAPI*)), this, SLOT(handleChannelAdded(int, ChannelAPI*)));
QObject::disconnect(
m_networkManager,
&QNetworkAccessManager::finished,
@ -124,6 +127,11 @@ bool APRS::handleMessage(const Message& cmd)
}
return true;
}
else if (MsgQueryAvailableChannels::match(cmd))
{
notifyUpdateChannels();
return true;
}
else if (MainCore::MsgPacket::match(cmd))
{
MainCore::MsgPacket& report = (MainCore::MsgPacket&) cmd;
@ -145,23 +153,6 @@ bool APRS::handleMessage(const Message& cmd)
}
}
void APRS::updatePipes()
{
QList<AvailablePipeSource> availablePipes = updateAvailablePipeSources("packets", APRSSettings::m_pipeTypes, APRSSettings::m_pipeURIs, this);
if (availablePipes != m_availablePipes)
{
m_availablePipes = availablePipes;
if (getMessageQueueToGUI())
{
MsgReportPipes *msgToGUI = MsgReportPipes::create();
QList<AvailablePipeSource>& msgAvailablePipes = msgToGUI->getAvailablePipes();
msgAvailablePipes.append(availablePipes);
getMessageQueueToGUI()->push(msgToGUI);
}
}
}
QByteArray APRS::serialize() const
{
return m_settings.serialize();
@ -447,3 +438,114 @@ void APRS::networkManagerFinished(QNetworkReply *reply)
reply->deleteLater();
}
void APRS::scanAvailableChannels()
{
MainCore *mainCore = MainCore::instance();
MessagePipes& messagePipes = mainCore->getMessagePipes();
std::vector<DeviceSet*>& deviceSets = mainCore->getDeviceSets();
m_availableChannels.clear();
for (const auto& deviceSet : deviceSets)
{
DSPDeviceSourceEngine *deviceSourceEngine = deviceSet->m_deviceSourceEngine;
if (deviceSourceEngine)
{
for (int chi = 0; chi < deviceSet->getNumberOfChannels(); chi++)
{
ChannelAPI *channel = deviceSet->getChannelAt(chi);
if (APRSSettings::m_pipeURIs.contains(channel->getURI()) && !m_availableChannels.contains(channel))
{
qDebug("APRS::scanAvailableChannels: register %d:%d %s (%p)",
deviceSet->getIndex(), chi, qPrintable(channel->getURI()), channel);
ObjectPipe *pipe = messagePipes.registerProducerToConsumer(channel, this, "packets");
MessageQueue *messageQueue = qobject_cast<MessageQueue*>(pipe->m_element);
QObject::connect(
messageQueue,
&MessageQueue::messageEnqueued,
this,
[=](){ this->handleChannelMessageQueue(messageQueue); },
Qt::QueuedConnection
);
connect(pipe, SIGNAL(toBeDeleted(int, QObject*)), this, SLOT(handleMessagePipeToBeDeleted(int, QObject*)));
APRSSettings::AvailableChannel availableChannel =
APRSSettings::AvailableChannel{deviceSet->getIndex(), chi, channel->getIdentifier()};
m_availableChannels[channel] = availableChannel;
}
}
notifyUpdateChannels();
}
}
}
void APRS::notifyUpdateChannels()
{
if (getMessageQueueToGUI())
{
MsgReportAvailableChannels *msg = MsgReportAvailableChannels::create();
msg->getChannels() = m_availableChannels.values();
getMessageQueueToGUI()->push(msg);
}
}
void APRS::handleChannelAdded(int deviceSetIndex, ChannelAPI *channel)
{
qDebug("APRS::handleChannelAdded: deviceSetIndex: %d:%d channel: %s (%p)",
deviceSetIndex, channel->getIndexInDeviceSet(), qPrintable(channel->getURI()), channel);
DeviceSet *deviceSet = MainCore::instance()->getDeviceSets()[deviceSetIndex];
DSPDeviceSourceEngine *deviceSourceEngine = deviceSet->m_deviceSourceEngine;
if (deviceSourceEngine && APRSSettings::m_pipeURIs.contains(channel->getURI()))
{
int chi = channel->getIndexInDeviceSet();
if (!m_availableChannels.contains(channel))
{
MessagePipes& messagePipes = MainCore::instance()->getMessagePipes();
ObjectPipe *pipe = messagePipes.registerProducerToConsumer(channel, this, "packets");
MessageQueue *messageQueue = qobject_cast<MessageQueue*>(pipe->m_element);
QObject::connect(
messageQueue,
&MessageQueue::messageEnqueued,
this,
[=](){ this->handleChannelMessageQueue(messageQueue); },
Qt::QueuedConnection
);
connect(pipe, SIGNAL(toBeDeleted(int, QObject*)), this, SLOT(handleMessagePipeToBeDeleted(int, QObject*)));
}
APRSSettings::AvailableChannel availableChannel =
APRSSettings::AvailableChannel{deviceSet->getIndex(), chi, channel->getIdentifier()};
m_availableChannels[channel] = availableChannel;
notifyUpdateChannels();
}
}
void APRS::handleMessagePipeToBeDeleted(int reason, QObject* object)
{
if (reason == 0) // producer (channel)
{
if (m_availableChannels.contains((ChannelAPI*) object))
{
qDebug("APRS::handleMessagePipeToBeDeleted: removing channel at (%p)", object);
m_availableChannels.remove((ChannelAPI*) object);
notifyUpdateChannels();
}
}
}
void APRS::handleChannelMessageQueue(MessageQueue* messageQueue)
{
Message* message;
while ((message = messageQueue->pop()) != nullptr)
{
if (handleMessage(*message)) {
delete message;
}
}
}

View File

@ -22,7 +22,6 @@
#include <QThread>
#include <QHash>
#include <QNetworkRequest>
#include <QTimer>
#include "feature/feature.h"
#include "util/message.h"
@ -83,6 +82,38 @@ public:
{}
};
class MsgQueryAvailableChannels : public Message {
MESSAGE_CLASS_DECLARATION
public:
static MsgQueryAvailableChannels* create() {
return new MsgQueryAvailableChannels();
}
protected:
MsgQueryAvailableChannels() :
Message()
{ }
};
class MsgReportAvailableChannels : public Message {
MESSAGE_CLASS_DECLARATION
public:
QList<APRSSettings::AvailableChannel>& getChannels() { return m_availableChannels; }
static MsgReportAvailableChannels* create() {
return new MsgReportAvailableChannels();
}
private:
QList<APRSSettings::AvailableChannel> m_availableChannels;
MsgReportAvailableChannels() :
Message()
{}
};
APRS(WebAPIAdapterInterface *webAPIAdapterInterface);
virtual ~APRS();
virtual void destroy() { delete this; }
@ -124,8 +155,7 @@ private:
QThread m_thread;
APRSWorker *m_worker;
APRSSettings m_settings;
QList<PipeEndPoint::AvailablePipeSource> m_availablePipes;
QTimer m_updatePipesTimer;
QHash<ChannelAPI*, APRSSettings::AvailableChannel> m_availableChannels;
QNetworkAccessManager *m_networkManager;
QNetworkRequest m_networkRequest;
@ -134,10 +164,14 @@ private:
void stop();
void applySettings(const APRSSettings& settings, bool force = false);
void webapiReverseSendSettings(QList<QString>& featureSettingsKeys, const APRSSettings& settings, bool force);
void scanAvailableChannels();
void notifyUpdateChannels();
private slots:
void updatePipes();
void networkManagerFinished(QNetworkReply *reply);
void handleChannelAdded(int deviceSetIndex, ChannelAPI *channel);
void handleMessagePipeToBeDeleted(int reason, QObject* object);
void handleChannelMessageQueue(MessageQueue* messageQueue);
};
#endif // INCLUDE_FEATURE_APRS_H_

View File

@ -157,11 +157,11 @@ bool APRSGUI::handleMessage(const Message& message)
return true;
}
else if (PipeEndPoint::MsgReportPipes::match(message))
else if (APRS::MsgReportAvailableChannels::match(message))
{
PipeEndPoint::MsgReportPipes& report = (PipeEndPoint::MsgReportPipes&) message;
m_availablePipes = report.getAvailablePipes();
updatePipeList();
APRS::MsgReportAvailableChannels& report = (APRS::MsgReportAvailableChannels&) message;
m_availableChannels = report.getChannels();
updateChannelList();
return true;
}
@ -551,6 +551,8 @@ APRSGUI::APRSGUI(PluginAPI* pluginAPI, FeatureUISet *featureUISet, Feature *feat
m_settings.setRollupState(&m_rollupState);
m_aprs->getInputMessageQueue()->push(APRS::MsgQueryAvailableChannels::create());
displaySettings();
applySettings(true);
}
@ -637,15 +639,13 @@ void APRSGUI::displaySettings()
blockApplySettings(false);
}
void APRSGUI::updatePipeList()
void APRSGUI::updateChannelList()
{
ui->sourcePipes->blockSignals(true);
ui->sourcePipes->clear();
QList<PipeEndPoint::AvailablePipeSource>::const_iterator it = m_availablePipes.begin();
for (int i = 0; it != m_availablePipes.end(); ++it, i++)
{
ui->sourcePipes->addItem(it->getName());
for (const auto& channel : m_availableChannels) {
ui->sourcePipes->addItem(tr("R%1:%2 %3").arg(channel.m_deviceSetIndex).arg(channel.m_channelIndex).arg(channel.m_type));
}
ui->sourcePipes->blockSignals(false);

View File

@ -28,7 +28,6 @@
#include "feature/featuregui.h"
#include "util/messagequeue.h"
#include "pipes/pipeendpoint.h"
#include "util/aprs.h"
#include "settings/rollupstate.h"
@ -117,7 +116,7 @@ private:
APRSSettings m_settings;
RollupState m_rollupState;
bool m_doApplySettings;
QList<PipeEndPoint::AvailablePipeSource> m_availablePipes;
QList<APRSSettings::AvailableChannel> m_availableChannels;
APRS* m_aprs;
MessageQueue m_inputMessageQueue;
@ -154,7 +153,7 @@ private:
bool filterStation(APRSStation *station);
void filterStations();
void displaySettings();
void updatePipeList();
void updateChannelList();
bool handleMessage(const Message& message);
void leaveEvent(QEvent*);

View File

@ -36,6 +36,17 @@ class Serializable;
struct APRSSettings
{
struct AvailableChannel
{
int m_deviceSetIndex;
int m_channelIndex;
QString m_type;
AvailableChannel() = default;
AvailableChannel(const AvailableChannel&) = default;
AvailableChannel& operator=(const AvailableChannel&) = default;
};
QString m_igateServer;
int m_igatePort;
QString m_igateCallsign;

View File

@ -28,7 +28,6 @@
#include "feature/featuregui.h"
#include "util/messagequeue.h"
#include "util/radiosonde.h"
#include "pipes/pipeendpoint.h"
#include "settings/rollupstate.h"
#include "radiosondesettings.h"