Add support for message pipes from features to channels

This commit is contained in:
Jon Beniston 2021-10-12 11:18:29 +01:00
parent cddc8c9b83
commit 05fce637bc
8 changed files with 95 additions and 42 deletions

View File

@ -130,6 +130,15 @@ public:
virtual int getNbSourceStreams() const = 0;
virtual qint64 getStreamCenterFrequency(int streamIndex, bool sinkElseSource) const = 0;
void handlePipeMessageQueue(MessageQueue* messageQueue)
{
Message* message;
while ((message = messageQueue->pop()) != nullptr) {
m_channelMessageQueue.push(message);
}
}
protected:
MessageQueue *m_guiMessageQueue; //!< Input message queue to the GUI
MessageQueue m_channelMessageQueue; //!< Input message queue for inter plugin communication

View File

@ -42,14 +42,14 @@ MessagePipes::~MessagePipes()
}
}
MessageQueue *MessagePipes::registerChannelToFeature(const PipeEndPoint *source, Feature *feature, const QString& type)
MessageQueue *MessagePipes::registerChannelToFeature(const PipeEndPoint *source, PipeEndPoint *dest, const QString& type)
{
return m_registrations.registerProducerToConsumer(source, feature, type);
return m_registrations.registerProducerToConsumer(source, dest, type);
}
MessageQueue *MessagePipes::unregisterChannelToFeature(const PipeEndPoint *source, Feature *feature, const QString& type)
MessageQueue *MessagePipes::unregisterChannelToFeature(const PipeEndPoint *source, PipeEndPoint *dest, const QString& type)
{
MessageQueue *messageQueue = m_registrations.unregisterProducerToConsumer(source, feature, type);
MessageQueue *messageQueue = m_registrations.unregisterProducerToConsumer(source, dest, type);
m_gcWorker->addMessageQueueToDelete(messageQueue);
return messageQueue;
}

View File

@ -30,7 +30,6 @@
#include "elementpipesregistrations.h"
class PipeEndPoint;
class Feature;
class MessagePipesGCWorker;
class MessageQueue;
@ -43,12 +42,13 @@ public:
MessagePipes& operator=(const MessagePipes&) = delete;
~MessagePipes();
MessageQueue *registerChannelToFeature(const PipeEndPoint *source, Feature *feature, const QString& type);
MessageQueue *unregisterChannelToFeature(const PipeEndPoint *source, Feature *feature, const QString& type);
// FIXME: Names of these functions should probably change, as we now support channel or feature at either end
MessageQueue *registerChannelToFeature(const PipeEndPoint *source, PipeEndPoint *dest, const QString& type);
MessageQueue *unregisterChannelToFeature(const PipeEndPoint *source, PipeEndPoint *dest, const QString& type);
QList<MessageQueue*>* getMessageQueues(const PipeEndPoint *source, const QString& type);
private:
ElementPipesRegistrations<PipeEndPoint, Feature, MessageQueue> m_registrations;
ElementPipesRegistrations<PipeEndPoint, PipeEndPoint, MessageQueue> m_registrations;
QThread m_gcThread; //!< Garbage collector thread
MessagePipesGCWorker *m_gcWorker; //!< Garbage collector

View File

@ -27,7 +27,6 @@
#include "elementpipescommon.h"
class PipeEndPoint;
class Feature;
class MessageQueue;
class SDRBASE_API MessagePipesCommon

View File

@ -24,26 +24,33 @@
bool MessagePipesGCWorker::MessagePipesGC::existsProducer(const PipeEndPoint *pipeEndPoint)
{
// Not overly sure about casting to both types here, but currently safeish as the
// existing functions only use the pointer address - and I presume these
// may be pointers to deleted objects anyway?
return MainCore::instance()->existsChannel((const ChannelAPI *)pipeEndPoint)
|| MainCore::instance()->existsFeature((const Feature *)pipeEndPoint);
}
bool MessagePipesGCWorker::MessagePipesGC::existsConsumer(const Feature *feature)
bool MessagePipesGCWorker::MessagePipesGC::existsConsumer(const PipeEndPoint *pipeEndPoint)
{
return MainCore::instance()->existsFeature(feature);
return MainCore::instance()->existsChannel((const ChannelAPI *)pipeEndPoint)
|| MainCore::instance()->existsFeature((const Feature *)pipeEndPoint);
}
void MessagePipesGCWorker::MessagePipesGC::sendMessageToConsumer(
const MessageQueue *messageQueue,
MessagePipesCommon::ChannelRegistrationKey channelKey,
Feature *feature)
PipeEndPoint *pipeEndPoint)
{
MessagePipesCommon::MsgReportChannelDeleted *msg = MessagePipesCommon::MsgReportChannelDeleted::create(
messageQueue, channelKey);
feature->getInputMessageQueue()->push(msg);
if (MainCore::instance()->existsFeature((const Feature *)pipeEndPoint)) // Use RTTI instead?
{
Feature *feature = (Feature *)pipeEndPoint;
feature->getInputMessageQueue()->push(msg);
}
else
{
ChannelAPI *channel = (ChannelAPI *)pipeEndPoint;
channel->getChannelMessageQueue()->push(msg);
}
}
MessagePipesGCWorker::MessagePipesGCWorker() :

View File

@ -38,10 +38,10 @@ public:
void setC2FRegistrations(
QMutex *c2fMutex,
QMap<MessagePipesCommon::ChannelRegistrationKey, QList<MessageQueue*>> *c2fQueues,
QMap<MessagePipesCommon::ChannelRegistrationKey, QList<Feature*>> *c2fFeatures
QMap<MessagePipesCommon::ChannelRegistrationKey, QList<PipeEndPoint*>> *c2fPipeEndPoints
)
{
m_messagePipesGC.setRegistrations(c2fMutex, c2fQueues, c2fFeatures);
m_messagePipesGC.setRegistrations(c2fMutex, c2fQueues, c2fPipeEndPoints);
}
void startWork();
@ -50,12 +50,12 @@ public:
bool isRunning() const { return m_running; }
private:
class MessagePipesGC : public ElementPipesGC<PipeEndPoint, Feature, MessageQueue>
class MessagePipesGC : public ElementPipesGC<PipeEndPoint, PipeEndPoint, MessageQueue>
{
private:
virtual bool existsProducer(const PipeEndPoint *pipeEndPoint);
virtual bool existsConsumer(const Feature *feature);
virtual void sendMessageToConsumer(const MessageQueue *messageQueue, MessagePipesCommon::ChannelRegistrationKey key, Feature *feature);
virtual bool existsConsumer(const PipeEndPoint *pipeEndPoint);
virtual void sendMessageToConsumer(const MessageQueue *messageQueue, MessagePipesCommon::ChannelRegistrationKey key, PipeEndPoint *pipeEndPoint);
};
MessagePipesGC m_messagePipesGC;

View File

@ -31,13 +31,14 @@
MESSAGE_CLASS_DEFINITION(PipeEndPoint::MsgReportPipes, Message)
QList<PipeEndPoint::AvailablePipeSource> PipeEndPoint::updateAvailablePipeSources(QString pipeName, QStringList pipeTypes, QStringList pipeURIs, Feature *destinationFeature)
QList<PipeEndPoint::AvailablePipeSource> PipeEndPoint::updateAvailablePipeSources(QString pipeName, QStringList pipeTypes, QStringList pipeURIs, PipeEndPoint *destination)
{
MainCore *mainCore = MainCore::instance();
MessagePipes& messagePipes = mainCore->getMessagePipes();
std::vector<DeviceSet*>& deviceSets = mainCore->getDeviceSets();
QHash<PipeEndPoint *, AvailablePipeSource> availablePipes;
// Source is a channel
int deviceIndex = 0;
for (std::vector<DeviceSet*>::const_iterator it = deviceSets.begin(); it != deviceSets.end(); ++it, deviceIndex++)
{
@ -55,14 +56,30 @@ QList<PipeEndPoint::AvailablePipeSource> PipeEndPoint::updateAvailablePipeSource
{
if (!availablePipes.contains(channel))
{
MessageQueue *messageQueue = messagePipes.registerChannelToFeature(channel, destinationFeature, pipeName);
QObject::connect(
messageQueue,
&MessageQueue::messageEnqueued,
destinationFeature,
[=](){ destinationFeature->handlePipeMessageQueue(messageQueue); },
Qt::QueuedConnection
);
MessageQueue *messageQueue = messagePipes.registerChannelToFeature(channel, destination, pipeName);
if (MainCore::instance()->existsFeature((const Feature *)destination))
{
// Destination is feature
Feature *featureDest = (Feature *)destination;
QObject::connect(
messageQueue,
&MessageQueue::messageEnqueued,
featureDest,
[=](){ featureDest->handlePipeMessageQueue(messageQueue); },
Qt::QueuedConnection
);
}
else
{
// Destination is a channel
// Can't use Qt::QueuedConnection because ChannelAPI isn't a QObject
ChannelAPI *channelDest = (ChannelAPI *)destination;
QObject::connect(
messageQueue,
&MessageQueue::messageEnqueued,
[=](){ channelDest->handlePipeMessageQueue(messageQueue); }
);
}
}
AvailablePipeSource availablePipe =
@ -79,6 +96,7 @@ QList<PipeEndPoint::AvailablePipeSource> PipeEndPoint::updateAvailablePipeSource
}
}
// Source is a feature
std::vector<FeatureSet*>& featureSets = mainCore->getFeatureeSets();
int featureIndex = 0;
for (std::vector<FeatureSet*>::const_iterator it = featureSets.begin(); it != featureSets.end(); ++it, featureIndex++)
@ -92,14 +110,30 @@ QList<PipeEndPoint::AvailablePipeSource> PipeEndPoint::updateAvailablePipeSource
{
if (!availablePipes.contains(feature))
{
MessageQueue *messageQueue = messagePipes.registerChannelToFeature(feature, destinationFeature, pipeName);
QObject::connect(
messageQueue,
&MessageQueue::messageEnqueued,
destinationFeature,
[=](){ destinationFeature->handlePipeMessageQueue(messageQueue); },
Qt::QueuedConnection
);
MessageQueue *messageQueue = messagePipes.registerChannelToFeature(feature, destination, pipeName);
if (MainCore::instance()->existsFeature((const Feature *)destination))
{
// Destination is feature
Feature *featureDest = (Feature *)destination;
QObject::connect(
messageQueue,
&MessageQueue::messageEnqueued,
featureDest,
[=](){ featureDest->handlePipeMessageQueue(messageQueue); },
Qt::QueuedConnection
);
}
else
{
// Destination is a channel
// Can't use Qt::QueuedConnection because ChannelAPI isn't a QObject
ChannelAPI *channelDest = (ChannelAPI *)destination;
QObject::connect(
messageQueue,
&MessageQueue::messageEnqueued,
[=](){ channelDest->handlePipeMessageQueue(messageQueue); }
);
}
}
AvailablePipeSource availablePipe =
@ -135,13 +169,17 @@ PipeEndPoint *PipeEndPoint::getPipeEndPoint(const QString name, const QList<Avai
QString id = re.capturedTexts()[4];
QListIterator<AvailablePipeSource> itr(availablePipeSources);
while (itr.hasNext()) {
while (itr.hasNext())
{
AvailablePipeSource p = itr.next();
if ((p.m_setIndex == setIndex) && (p.m_index == index) && (id == p.m_id))
if ((p.m_setIndex == setIndex) && (p.m_index == index) && (id == p.m_id)) {
return p.m_source;
}
}
}
else
{
qDebug() << "PipeEndPoint::getPipeEndPoint: " << name << " is malformed";
}
return nullptr;
}

View File

@ -32,7 +32,7 @@ class Feature;
class SDRBASE_API PipeEndPoint {
public:
// Used by pipe sinks (features) to record details about available pipe sources (channels or features)
// Used by pipe sinks (channels or features) to record details about available pipe sources (channels or features)
struct AvailablePipeSource
{
enum {RX, TX, Feature} m_type;
@ -92,7 +92,7 @@ public:
protected:
// Utility functions for pipe sinks to manage list of sources
QList<AvailablePipeSource> updateAvailablePipeSources(QString pipeName, QStringList pipeTypes, QStringList pipeURIs, Feature *destinationFeature);
QList<AvailablePipeSource> updateAvailablePipeSources(QString pipeName, QStringList pipeTypes, QStringList pipeURIs, PipeEndPoint *destination);
PipeEndPoint *getPipeEndPoint(const QString name, const QList<AvailablePipeSource> &availablePipeSources);
};