mirror of
				https://github.com/f4exb/sdrangel.git
				synced 2025-10-31 04:50:29 -04:00 
			
		
		
		
	Implemented data pipes with generic element pipes
This commit is contained in:
		
							parent
							
								
									3ffe923d8d
								
							
						
					
					
						commit
						af3f016f0b
					
				| @ -135,7 +135,7 @@ bool AFC::handleMessage(const Message& cmd) | ||||
|         qDebug() << "AFC::handleMessage: MessagePipesCommon::MsgReportChannelDeleted"; | ||||
|         MessagePipesCommon::MsgReportChannelDeleted& report = (MessagePipesCommon::MsgReportChannelDeleted&) cmd; | ||||
|         const MessagePipesCommon::ChannelRegistrationKey& channelKey = report.getChannelRegistrationKey(); | ||||
|         const ChannelAPI *channel = channelKey.m_channel; | ||||
|         const ChannelAPI *channel = channelKey.m_key; | ||||
|         MainCore::instance()->getMessagePipes().unregisterChannelToFeature(channel, this, "settings"); | ||||
| 
 | ||||
|         return true; | ||||
|  | ||||
| @ -244,7 +244,7 @@ bool VORLocalizer::handleMessage(const Message& cmd) | ||||
|         qDebug() << "VORLocalizer::handleMessage: MsgReportChannelDeleted"; | ||||
|         MessagePipesCommon::MsgReportChannelDeleted& report = (MessagePipesCommon::MsgReportChannelDeleted&) cmd; | ||||
|         const MessagePipesCommon::ChannelRegistrationKey& channelKey = report.getChannelRegistrationKey(); | ||||
|         const ChannelAPI *channel = channelKey.m_channel; | ||||
|         const ChannelAPI *channel = channelKey.m_key; | ||||
|         m_availableChannels.remove(const_cast<ChannelAPI*>(channel)); | ||||
|         updateChannels(); | ||||
|         MainCore::instance()->getMessagePipes().unregisterChannelToFeature(channel, this, "report"); | ||||
|  | ||||
| @ -99,6 +99,7 @@ set(sdrbase_SOURCES | ||||
|     dsp/channelsamplesource.cpp | ||||
|     dsp/cwkeyer.cpp | ||||
|     dsp/cwkeyersettings.cpp | ||||
|     dsp/datafifo.cpp | ||||
|     dsp/decimatorsff.cpp | ||||
|     dsp/decimatorsfi.cpp | ||||
|     dsp/decimatorc.cpp | ||||
| @ -160,6 +161,9 @@ set(sdrbase_SOURCES | ||||
| 
 | ||||
|     limerfe/limerfeusbcalib.cpp | ||||
| 
 | ||||
|     pipes/datapipes.cpp | ||||
|     pipes/datapipescommon.cpp | ||||
|     pipes/datapipesgcworker.cpp | ||||
|     pipes/messagepipes.cpp | ||||
|     pipes/messagepipescommon.cpp | ||||
|     pipes/messagepipesgcworker.cpp | ||||
| @ -248,6 +252,7 @@ set(sdrbase_HEADERS | ||||
|     dsp/ctcssfrequencies.h | ||||
|     dsp/cwkeyer.h | ||||
|     dsp/cwkeyersettings.h | ||||
|     dsp/datafifo.h | ||||
|     dsp/decimators.h | ||||
|     dsp/decimatorsif.h | ||||
|     dsp/decimatorsff.h | ||||
| @ -336,6 +341,11 @@ set(sdrbase_HEADERS | ||||
| 
 | ||||
|     limerfe/limerfeusbcalib.h | ||||
| 
 | ||||
|     pipes/datapipes.h | ||||
|     pipes/datapipescommon.h | ||||
|     pipes/datapipesgcworker.h | ||||
|     pipes/elementpipescommon.h | ||||
|     pipes/elementpipesgc.h | ||||
|     pipes/messagepipes.h | ||||
|     pipes/messagepipescommon.h | ||||
|     pipes/messagepipesgcworker.h | ||||
|  | ||||
| @ -29,7 +29,6 @@ | ||||
| #include "export.h" | ||||
| 
 | ||||
| class DeviceAPI; | ||||
| class Feature; | ||||
| 
 | ||||
| namespace SWGSDRangel | ||||
| { | ||||
|  | ||||
							
								
								
									
										287
									
								
								sdrbase/dsp/datafifo.cpp
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										287
									
								
								sdrbase/dsp/datafifo.cpp
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,287 @@ | ||||
| ///////////////////////////////////////////////////////////////////////////////////
 | ||||
| // Copyright (C) 2020 F4EXB                                                      //
 | ||||
| // written by Edouard Griffiths                                                  //
 | ||||
| //                                                                               //
 | ||||
| // This program is free software; you can redistribute it and/or modify          //
 | ||||
| // it under the terms of the GNU General Public License as published by          //
 | ||||
| // the Free Software Foundation as version 3 of the License, or                  //
 | ||||
| // (at your option) any later version.                                           //
 | ||||
| //                                                                               //
 | ||||
| // This program is distributed in the hope that it will be useful,               //
 | ||||
| // but WITHOUT ANY WARRANTY; without even the implied warranty of                //
 | ||||
| // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the                  //
 | ||||
| // GNU General Public License V3 for more details.                               //
 | ||||
| //                                                                               //
 | ||||
| // You should have received a copy of the GNU General Public License             //
 | ||||
| // along with this program. If not, see <http://www.gnu.org/licenses/>.          //
 | ||||
| ///////////////////////////////////////////////////////////////////////////////////
 | ||||
| 
 | ||||
| #include "datafifo.h" | ||||
| 
 | ||||
| void DataFifo::create(unsigned int s) | ||||
| { | ||||
| 	m_size = 0; | ||||
| 	m_fill = 0; | ||||
| 	m_head = 0; | ||||
| 	m_tail = 0; | ||||
| 
 | ||||
| 	m_data.resize(s); | ||||
| 	m_size = m_data.size(); | ||||
| } | ||||
| 
 | ||||
| void DataFifo::reset() | ||||
| { | ||||
| 	m_suppressed = -1; | ||||
| 	m_fill = 0; | ||||
| 	m_head = 0; | ||||
| 	m_tail = 0; | ||||
| } | ||||
| 
 | ||||
| DataFifo::DataFifo(QObject* parent) : | ||||
| 	QObject(parent), | ||||
| 	m_data() | ||||
| { | ||||
| 	m_suppressed = -1; | ||||
| 	m_size = 0; | ||||
| 	m_fill = 0; | ||||
| 	m_head = 0; | ||||
| 	m_tail = 0; | ||||
| } | ||||
| 
 | ||||
| DataFifo::DataFifo(int size, QObject* parent) : | ||||
| 	QObject(parent), | ||||
| 	m_data() | ||||
| { | ||||
| 	m_suppressed = -1; | ||||
| 	create(size); | ||||
| } | ||||
| 
 | ||||
| DataFifo::DataFifo(const DataFifo& other) : | ||||
|     QObject(other.parent()), | ||||
|     m_data(other.m_data) | ||||
| { | ||||
|   	m_suppressed = -1; | ||||
| 	m_size = m_data.size(); | ||||
| 	m_fill = 0; | ||||
| 	m_head = 0; | ||||
| 	m_tail = 0; | ||||
| } | ||||
| 
 | ||||
| DataFifo::~DataFifo() | ||||
| { | ||||
| 	QMutexLocker mutexLocker(&m_mutex); | ||||
| 	m_size = 0; | ||||
| } | ||||
| 
 | ||||
| bool DataFifo::setSize(int size) | ||||
| { | ||||
| 	create(size); | ||||
| 
 | ||||
| 	return m_data.size() == (unsigned int)size; | ||||
| } | ||||
| 
 | ||||
| unsigned int DataFifo::write(const quint8* data, unsigned int count) | ||||
| { | ||||
| 	QMutexLocker mutexLocker(&m_mutex); | ||||
| 	unsigned int total; | ||||
| 	unsigned int remaining; | ||||
| 	unsigned int len; | ||||
| 	const quint8* begin = (const quint8*) data; | ||||
| 	//count /= sizeof(Sample);
 | ||||
| 
 | ||||
| 	total = std::min(count, m_size - m_fill); | ||||
| 
 | ||||
|     if (total < count) | ||||
|     { | ||||
| 		if (m_suppressed < 0) | ||||
|         { | ||||
| 			m_suppressed = 0; | ||||
| 			m_msgRateTimer.start(); | ||||
| 			qCritical("DataFifo::write: overflow - dropping %u samples", count - total); | ||||
| 		} | ||||
|         else | ||||
|         { | ||||
| 			if (m_msgRateTimer.elapsed() > 2500) | ||||
|             { | ||||
| 				qCritical("DataFifo::write: %u messages dropped", m_suppressed); | ||||
| 				qCritical("DataFifo::write: overflow - dropping %u samples", count - total); | ||||
| 				m_suppressed = -1; | ||||
| 			} | ||||
|             else | ||||
|             { | ||||
| 				m_suppressed++; | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	remaining = total; | ||||
| 
 | ||||
|     while (remaining > 0) | ||||
|     { | ||||
| 		len = std::min(remaining, m_size - m_tail); | ||||
| 		std::copy(begin, begin + len, m_data.begin() + m_tail); | ||||
| 		m_tail += len; | ||||
| 		m_tail %= m_size; | ||||
| 		m_fill += len; | ||||
| 		begin += len; | ||||
| 		remaining -= len; | ||||
| 	} | ||||
| 
 | ||||
| 	if (m_fill > 0) { | ||||
| 		emit dataReady(); | ||||
|     } | ||||
| 
 | ||||
| 	return total; | ||||
| } | ||||
| 
 | ||||
| unsigned int DataFifo::write(QByteArray::const_iterator begin, QByteArray::const_iterator end) | ||||
| { | ||||
| 	QMutexLocker mutexLocker(&m_mutex); | ||||
| 	unsigned int count = end - begin; | ||||
| 	unsigned int total; | ||||
| 	unsigned int remaining; | ||||
| 	unsigned int len; | ||||
| 
 | ||||
| 	total = std::min(count, m_size - m_fill); | ||||
| 
 | ||||
|     if (total < count) | ||||
|     { | ||||
| 		if (m_suppressed < 0) | ||||
|         { | ||||
| 			m_suppressed = 0; | ||||
| 			m_msgRateTimer.start(); | ||||
| 			qCritical("DataFifo::write: overflow - dropping %u samples", count - total); | ||||
| 		} | ||||
|         else | ||||
|         { | ||||
| 			if (m_msgRateTimer.elapsed() > 2500) | ||||
|             { | ||||
| 				qCritical("DataFifo::write: %u messages dropped", m_suppressed); | ||||
| 				qCritical("DataFifo::write: overflow - dropping %u samples", count - total); | ||||
| 				m_suppressed = -1; | ||||
| 			} | ||||
|             else | ||||
|             { | ||||
| 				m_suppressed++; | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	remaining = total; | ||||
| 
 | ||||
|     while (remaining > 0) | ||||
|     { | ||||
| 		len = std::min(remaining, m_size - m_tail); | ||||
| 		std::copy(begin, begin + len, m_data.begin() + m_tail); | ||||
| 		m_tail += len; | ||||
| 		m_tail %= m_size; | ||||
| 		m_fill += len; | ||||
| 		begin += len; | ||||
| 		remaining -= len; | ||||
| 	} | ||||
| 
 | ||||
| 	if (m_fill > 0) { | ||||
| 		emit dataReady(); | ||||
|     } | ||||
| 
 | ||||
| 	return total; | ||||
| } | ||||
| 
 | ||||
| unsigned int DataFifo::read(QByteArray::iterator begin, QByteArray::iterator end) | ||||
| { | ||||
| 	QMutexLocker mutexLocker(&m_mutex); | ||||
| 	unsigned int count = end - begin; | ||||
| 	unsigned int total; | ||||
| 	unsigned int remaining; | ||||
| 	unsigned int len; | ||||
| 
 | ||||
| 	total = std::min(count, m_fill); | ||||
| 
 | ||||
|     if (total < count) { | ||||
| 		qCritical("DataFifo::read: underflow - missing %u samples", count - total); | ||||
|     } | ||||
| 
 | ||||
| 	remaining = total; | ||||
| 
 | ||||
|     while (remaining > 0) | ||||
|     { | ||||
| 		len = std::min(remaining, m_size - m_head); | ||||
| 		std::copy(m_data.begin() + m_head, m_data.begin() + m_head + len, begin); | ||||
| 		m_head += len; | ||||
| 		m_head %= m_size; | ||||
| 		m_fill -= len; | ||||
| 		begin += len; | ||||
| 		remaining -= len; | ||||
| 	} | ||||
| 
 | ||||
| 	return total; | ||||
| } | ||||
| 
 | ||||
| unsigned int DataFifo::readBegin(unsigned int count, | ||||
| 	QByteArray::iterator* part1Begin, QByteArray::iterator* part1End, | ||||
| 	QByteArray::iterator* part2Begin, QByteArray::iterator* part2End) | ||||
| { | ||||
| 	QMutexLocker mutexLocker(&m_mutex); | ||||
| 	unsigned int total; | ||||
| 	unsigned int remaining; | ||||
| 	unsigned int len; | ||||
| 	unsigned int head = m_head; | ||||
| 
 | ||||
| 	total = std::min(count, m_fill); | ||||
| 
 | ||||
|     if (total < count) { | ||||
| 		qCritical("DataFifo::readBegin: underflow - missing %u samples", count - total); | ||||
|     } | ||||
| 
 | ||||
| 	remaining = total; | ||||
| 
 | ||||
|     if (remaining > 0) | ||||
|     { | ||||
| 		len = std::min(remaining, m_size - head); | ||||
| 		*part1Begin = m_data.begin() + head; | ||||
| 		*part1End = m_data.begin() + head + len; | ||||
| 		head += len; | ||||
| 		head %= m_size; | ||||
| 		remaining -= len; | ||||
| 	} | ||||
|     else | ||||
|     { | ||||
| 		*part1Begin = m_data.end(); | ||||
| 		*part1End = m_data.end(); | ||||
| 	} | ||||
| 
 | ||||
|     if (remaining > 0) | ||||
|     { | ||||
| 		len = std::min(remaining, m_size - head); | ||||
| 		*part2Begin = m_data.begin() + head; | ||||
| 		*part2End = m_data.begin() + head + len; | ||||
| 	} | ||||
|     else | ||||
|     { | ||||
| 		*part2Begin = m_data.end(); | ||||
| 		*part2End = m_data.end(); | ||||
| 	} | ||||
| 
 | ||||
| 	return total; | ||||
| } | ||||
| 
 | ||||
| unsigned int DataFifo::readCommit(unsigned int count) | ||||
| { | ||||
| 	QMutexLocker mutexLocker(&m_mutex); | ||||
| 
 | ||||
| 	if (count > m_fill) | ||||
|     { | ||||
| 		qCritical("DataFifo::readCommit: cannot commit more than available samples"); | ||||
| 		count = m_fill; | ||||
| 	} | ||||
| 
 | ||||
|     m_head = (m_head + count) % m_size; | ||||
| 	m_fill -= count; | ||||
| 
 | ||||
| 	return count; | ||||
| } | ||||
| 
 | ||||
| unsigned int DataFifo::getSizePolicy(unsigned int sampleRate) | ||||
| { | ||||
|     return (sampleRate/100)*64; // .64s
 | ||||
| } | ||||
							
								
								
									
										73
									
								
								sdrbase/dsp/datafifo.h
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										73
									
								
								sdrbase/dsp/datafifo.h
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,73 @@ | ||||
| ///////////////////////////////////////////////////////////////////////////////////
 | ||||
| // Copyright (C) 2020 F4EXB                                                      //
 | ||||
| // written by Edouard Griffiths                                                  //
 | ||||
| //                                                                               //
 | ||||
| // This program is free software; you can redistribute it and/or modify          //
 | ||||
| // it under the terms of the GNU General Public License as published by          //
 | ||||
| // the Free Software Foundation as version 3 of the License, or                  //
 | ||||
| // (at your option) any later version.                                           //
 | ||||
| //                                                                               //
 | ||||
| // This program is distributed in the hope that it will be useful,               //
 | ||||
| // but WITHOUT ANY WARRANTY; without even the implied warranty of                //
 | ||||
| // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the                  //
 | ||||
| // GNU General Public License V3 for more details.                               //
 | ||||
| //                                                                               //
 | ||||
| // You should have received a copy of the GNU General Public License             //
 | ||||
| // along with this program. If not, see <http://www.gnu.org/licenses/>.          //
 | ||||
| ///////////////////////////////////////////////////////////////////////////////////
 | ||||
| 
 | ||||
| #ifndef INCLUDE_DATAFIFO_H | ||||
| #define INCLUDE_DATAFIFO_H | ||||
| 
 | ||||
| #include <QObject> | ||||
| #include <QMutex> | ||||
| #include <QElapsedTimer> | ||||
| #include <QByteArray> | ||||
| 
 | ||||
| #include "dsp/dsptypes.h" | ||||
| #include "export.h" | ||||
| 
 | ||||
| class SDRBASE_API DataFifo : public QObject { | ||||
| 	Q_OBJECT | ||||
| 
 | ||||
| private: | ||||
| 	QMutex m_mutex; | ||||
| 	QElapsedTimer m_msgRateTimer; | ||||
| 	int m_suppressed; | ||||
| 
 | ||||
| 	QByteArray m_data; | ||||
| 
 | ||||
| 	unsigned int m_size; | ||||
| 	unsigned int m_fill; | ||||
| 	unsigned int m_head; | ||||
| 	unsigned int m_tail; | ||||
| 
 | ||||
| 	void create(unsigned int s); | ||||
| 
 | ||||
| public: | ||||
| 	DataFifo(QObject* parent = nullptr); | ||||
| 	DataFifo(int size, QObject* parent = nullptr); | ||||
|     DataFifo(const DataFifo& other); | ||||
| 	~DataFifo(); | ||||
| 
 | ||||
| 	bool setSize(int size); | ||||
|     void reset(); | ||||
| 	inline unsigned int size() const { return m_size; } | ||||
| 	inline unsigned int fill() { QMutexLocker mutexLocker(&m_mutex); unsigned int fill = m_fill; return fill; } | ||||
| 
 | ||||
| 	unsigned int write(const quint8* data, unsigned int count); | ||||
| 	unsigned int write(QByteArray::const_iterator begin, QByteArray::const_iterator end); | ||||
| 
 | ||||
| 	unsigned int read(QByteArray::iterator begin, QByteArray::iterator end); | ||||
| 
 | ||||
| 	unsigned int readBegin(unsigned int count, | ||||
| 		QByteArray::iterator* part1Begin, QByteArray::iterator* part1End, | ||||
| 		QByteArray::iterator* part2Begin, QByteArray::iterator* part2End); | ||||
| 	unsigned int readCommit(unsigned int count); | ||||
|     static unsigned int getSizePolicy(unsigned int sampleRate); | ||||
| 
 | ||||
| signals: | ||||
| 	void dataReady(); | ||||
| }; | ||||
| 
 | ||||
| #endif // INCLUDE_DATAFIFO_H
 | ||||
							
								
								
									
										71
									
								
								sdrbase/pipes/datapipes.cpp
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										71
									
								
								sdrbase/pipes/datapipes.cpp
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,71 @@ | ||||
| ///////////////////////////////////////////////////////////////////////////////////
 | ||||
| // Copyright (C) 2020 Edouard Griffiths, F4EXB                                   //
 | ||||
| //                                                                               //
 | ||||
| // This program is free software; you can redistribute it and/or modify          //
 | ||||
| // it under the terms of the GNU General Public License as published by          //
 | ||||
| // the Free Software Foundation as version 3 of the License, or                  //
 | ||||
| // (at your option) any later version.                                           //
 | ||||
| //                                                                               //
 | ||||
| // This program is distributed in the hope that it will be useful,               //
 | ||||
| // but WITHOUT ANY WARRANTY; without even the implied warranty of                //
 | ||||
| // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the                  //
 | ||||
| // GNU General Public License V3 for more details.                               //
 | ||||
| //                                                                               //
 | ||||
| // You should have received a copy of the GNU General Public License             //
 | ||||
| // along with this program. If not, see <http://www.gnu.org/licenses/>.          //
 | ||||
| ///////////////////////////////////////////////////////////////////////////////////
 | ||||
| 
 | ||||
| #include "dsp/datafifo.h" | ||||
| 
 | ||||
| #include "datapipesgcworker.h" | ||||
| #include "datapipes.h" | ||||
| 
 | ||||
| DataPipes::DataPipes() | ||||
| { | ||||
| 	m_gcWorker = new DataPipesGCWorker(); | ||||
| 	m_gcWorker->setC2FRegistrations( | ||||
| 		m_registrations.getMutex(), | ||||
| 		m_registrations.getElements(), | ||||
| 		m_registrations.getConsumers() | ||||
| 	); | ||||
| 	m_gcWorker->moveToThread(&m_gcThread); | ||||
| 	startGC(); | ||||
| } | ||||
| 
 | ||||
| DataPipes::~DataPipes() | ||||
| { | ||||
| 	if (m_gcWorker->isRunning()) { | ||||
| 		stopGC(); | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| DataFifo *DataPipes::registerChannelToFeature(const ChannelAPI *source, Feature *feature, const QString& type) | ||||
| { | ||||
| 	return m_registrations.registerProducerToConsumer(source, feature, type); | ||||
| } | ||||
| 
 | ||||
| void DataPipes::unregisterChannelToFeature(const ChannelAPI *source, Feature *feature, const QString& type) | ||||
| { | ||||
| 	m_registrations.unregisterProducerToConsumer(source, feature, type); | ||||
| } | ||||
| 
 | ||||
| QList<DataFifo*>* DataPipes::getFifos(const ChannelAPI *source, const QString& type) | ||||
| { | ||||
| 	return m_registrations.getElements(source, type); | ||||
| } | ||||
| 
 | ||||
| void DataPipes::startGC() | ||||
| { | ||||
| 	qDebug("DataPipes::startGC"); | ||||
| 
 | ||||
|     m_gcWorker->startWork(); | ||||
|     m_gcThread.start(); | ||||
| } | ||||
| 
 | ||||
| void DataPipes::stopGC() | ||||
| { | ||||
|     qDebug("DataPipes::stopGC"); | ||||
| 	m_gcWorker->stopWork(); | ||||
| 	m_gcThread.quit(); | ||||
| 	m_gcThread.wait(); | ||||
| } | ||||
							
								
								
									
										59
									
								
								sdrbase/pipes/datapipes.h
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										59
									
								
								sdrbase/pipes/datapipes.h
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,59 @@ | ||||
| ///////////////////////////////////////////////////////////////////////////////////
 | ||||
| // Copyright (C) 2020 Edouard Griffiths, F4EXB                                   //
 | ||||
| //                                                                               //
 | ||||
| // This program is free software; you can redistribute it and/or modify          //
 | ||||
| // it under the terms of the GNU General Public License as published by          //
 | ||||
| // the Free Software Foundation as version 3 of the License, or                  //
 | ||||
| // (at your option) any later version.                                           //
 | ||||
| //                                                                               //
 | ||||
| // This program is distributed in the hope that it will be useful,               //
 | ||||
| // but WITHOUT ANY WARRANTY; without even the implied warranty of                //
 | ||||
| // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the                  //
 | ||||
| // GNU General Public License V3 for more details.                               //
 | ||||
| //                                                                               //
 | ||||
| // You should have received a copy of the GNU General Public License             //
 | ||||
| // along with this program. If not, see <http://www.gnu.org/licenses/>.          //
 | ||||
| ///////////////////////////////////////////////////////////////////////////////////
 | ||||
| 
 | ||||
| #ifndef SDRBASE_PIPES_DATAPIPES_H_ | ||||
| #define SDRBASE_PIPES_DATAPIPES_H_ | ||||
| 
 | ||||
| #include <QObject> | ||||
| #include <QHash> | ||||
| #include <QMap> | ||||
| #include <QMutex> | ||||
| #include <QThread> | ||||
| 
 | ||||
| #include "export.h" | ||||
| 
 | ||||
| #include "datapipescommon.h" | ||||
| #include "elementpipesregistrations.h" | ||||
| 
 | ||||
| class ChannelAPI; | ||||
| class Feature; | ||||
| class DataPipesGCWorker; | ||||
| class DataFifo; | ||||
| 
 | ||||
| class SDRBASE_API DataPipes : public QObject | ||||
| { | ||||
|     Q_OBJECT | ||||
| public: | ||||
|     DataPipes(); | ||||
|     DataPipes(const DataPipes&) = delete; | ||||
|     DataPipes& operator=(const DataPipes&) = delete; | ||||
|     ~DataPipes(); | ||||
| 
 | ||||
|     DataFifo *registerChannelToFeature(const ChannelAPI *source, Feature *feature, const QString& type); | ||||
|     void unregisterChannelToFeature(const ChannelAPI *source, Feature *feature, const QString& type); | ||||
|     QList<DataFifo*>* getFifos(const ChannelAPI *source, const QString& type); | ||||
| 
 | ||||
| private: | ||||
|     ElementPipesRegistrations<ChannelAPI, Feature, DataFifo> m_registrations; | ||||
|     QThread m_gcThread; //!< Garbage collector thread
 | ||||
|     DataPipesGCWorker *m_gcWorker; //!< Garbage collector
 | ||||
| 
 | ||||
| 	void startGC(); //!< Start garbage collector
 | ||||
| 	void stopGC();  //!< Stop garbage collector
 | ||||
| }; | ||||
| 
 | ||||
| #endif // SDRBASE_PIPES_DATAPIPES_H_
 | ||||
							
								
								
									
										20
									
								
								sdrbase/pipes/datapipescommon.cpp
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										20
									
								
								sdrbase/pipes/datapipescommon.cpp
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,20 @@ | ||||
| ///////////////////////////////////////////////////////////////////////////////////
 | ||||
| // Copyright (C) 2020 Edouard Griffiths, F4EXB                                   //
 | ||||
| //                                                                               //
 | ||||
| // This program is free software; you can redistribute it and/or modify          //
 | ||||
| // it under the terms of the GNU General Public License as published by          //
 | ||||
| // the Free Software Foundation as version 3 of the License, or                  //
 | ||||
| // (at your option) any later version.                                           //
 | ||||
| //                                                                               //
 | ||||
| // This program is distributed in the hope that it will be useful,               //
 | ||||
| // but WITHOUT ANY WARRANTY; without even the implied warranty of                //
 | ||||
| // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the                  //
 | ||||
| // GNU General Public License V3 for more details.                               //
 | ||||
| //                                                                               //
 | ||||
| // You should have received a copy of the GNU General Public License             //
 | ||||
| // along with this program. If not, see <http://www.gnu.org/licenses/>.          //
 | ||||
| ///////////////////////////////////////////////////////////////////////////////////
 | ||||
| 
 | ||||
| #include "datapipescommon.h" | ||||
| 
 | ||||
| MESSAGE_CLASS_DEFINITION(DataPipesCommon::MsgReportChannelDeleted, Message) | ||||
							
								
								
									
										63
									
								
								sdrbase/pipes/datapipescommon.h
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										63
									
								
								sdrbase/pipes/datapipescommon.h
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,63 @@ | ||||
| ///////////////////////////////////////////////////////////////////////////////////
 | ||||
| // Copyright (C) 2020 Edouard Griffiths, F4EXB                                   //
 | ||||
| //                                                                               //
 | ||||
| // This program is free software; you can redistribute it and/or modify          //
 | ||||
| // it under the terms of the GNU General Public License as published by          //
 | ||||
| // the Free Software Foundation as version 3 of the License, or                  //
 | ||||
| // (at your option) any later version.                                           //
 | ||||
| //                                                                               //
 | ||||
| // This program is distributed in the hope that it will be useful,               //
 | ||||
| // but WITHOUT ANY WARRANTY; without even the implied warranty of                //
 | ||||
| // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the                  //
 | ||||
| // GNU General Public License V3 for more details.                               //
 | ||||
| //                                                                               //
 | ||||
| // You should have received a copy of the GNU General Public License             //
 | ||||
| // along with this program. If not, see <http://www.gnu.org/licenses/>.          //
 | ||||
| ///////////////////////////////////////////////////////////////////////////////////
 | ||||
| 
 | ||||
| #ifndef SDRBASE_PIPES_DATAPIPESCOMON_H_ | ||||
| #define SDRBASE_PIPES_DATAPIPESCOMON_H_ | ||||
| 
 | ||||
| #include <QHash> | ||||
| #include <QMap> | ||||
| #include <QMutex> | ||||
| 
 | ||||
| #include "export.h" | ||||
| #include "util/message.h" | ||||
| 
 | ||||
| #include "elementpipescommon.h" | ||||
| 
 | ||||
| class ChannelAPI; | ||||
| class Feature; | ||||
| class DataFifo; | ||||
| 
 | ||||
| class SDRBASE_API DataPipesCommon | ||||
| { | ||||
| public: | ||||
|     typedef ElementPipesCommon::RegistrationKey<ChannelAPI> ChannelRegistrationKey; | ||||
| 
 | ||||
|     /** Send this message to stakeholders when the garbage collector finds that a channel was deleted */ | ||||
|     class SDRBASE_API MsgReportChannelDeleted : public Message { | ||||
|         MESSAGE_CLASS_DECLARATION | ||||
| 
 | ||||
|     public: | ||||
|         const DataFifo *getFifo() const { return m_fifo; } | ||||
|         const ChannelRegistrationKey& getChannelRegistrationKey() const { return m_channelRegistrationKey; } | ||||
| 
 | ||||
|         static MsgReportChannelDeleted* create(const DataFifo *fifo, const ChannelRegistrationKey& channelRegistrationKey) { | ||||
|             return new MsgReportChannelDeleted(fifo, channelRegistrationKey); | ||||
|         } | ||||
| 
 | ||||
|     private: | ||||
|         const DataFifo *m_fifo; | ||||
|         ChannelRegistrationKey m_channelRegistrationKey; | ||||
| 
 | ||||
|         MsgReportChannelDeleted(const DataFifo *fifo, const ChannelRegistrationKey& channelRegistrationKey) : | ||||
|             Message(), | ||||
|             m_fifo(fifo), | ||||
|             m_channelRegistrationKey(channelRegistrationKey) | ||||
|         { } | ||||
|     }; | ||||
| }; | ||||
| 
 | ||||
| #endif // SDRBASE_PIPES_DATAPIPESCOMON_H_
 | ||||
							
								
								
									
										66
									
								
								sdrbase/pipes/datapipesgcworker.cpp
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										66
									
								
								sdrbase/pipes/datapipesgcworker.cpp
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,66 @@ | ||||
| ///////////////////////////////////////////////////////////////////////////////////
 | ||||
| // Copyright (C) 2020 Edouard Griffiths, F4EXB                                   //
 | ||||
| //                                                                               //
 | ||||
| // This program is free software; you can redistribute it and/or modify          //
 | ||||
| // it under the terms of the GNU General Public License as published by          //
 | ||||
| // the Free Software Foundation as version 3 of the License, or                  //
 | ||||
| // (at your option) any later version.                                           //
 | ||||
| //                                                                               //
 | ||||
| // This program is distributed in the hope that it will be useful,               //
 | ||||
| // but WITHOUT ANY WARRANTY; without even the implied warranty of                //
 | ||||
| // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the                  //
 | ||||
| // GNU General Public License V3 for more details.                               //
 | ||||
| //                                                                               //
 | ||||
| // You should have received a copy of the GNU General Public License             //
 | ||||
| // along with this program. If not, see <http://www.gnu.org/licenses/>.          //
 | ||||
| ///////////////////////////////////////////////////////////////////////////////////
 | ||||
| 
 | ||||
| #include "feature/feature.h" | ||||
| #include "dsp/datafifo.h" | ||||
| #include "maincore.h" | ||||
| #include "datapipescommon.h" | ||||
| #include "datapipesgcworker.h" | ||||
| 
 | ||||
| bool DataPipesGCWorker::DataPipesGC::existsProducer(const ChannelAPI *channel) | ||||
| { | ||||
|     return MainCore::instance()->existsChannel(channel); | ||||
| } | ||||
| 
 | ||||
| bool DataPipesGCWorker::DataPipesGC::existsConsumer(const Feature *feature) | ||||
| { | ||||
|     return MainCore::instance()->existsFeature(feature); | ||||
| } | ||||
| 
 | ||||
| void DataPipesGCWorker::DataPipesGC::sendMessageToConsumer(const DataFifo *fifo,  DataPipesCommon::ChannelRegistrationKey channelKey, Feature *feature) | ||||
| { | ||||
|     DataPipesCommon::MsgReportChannelDeleted *msg = DataPipesCommon::MsgReportChannelDeleted::create( | ||||
|         fifo, channelKey); | ||||
|     feature->getInputMessageQueue()->push(msg); | ||||
| } | ||||
| 
 | ||||
| DataPipesGCWorker::DataPipesGCWorker() : | ||||
|     m_running(false) | ||||
| {} | ||||
| 
 | ||||
| DataPipesGCWorker::~DataPipesGCWorker() | ||||
| {} | ||||
| 
 | ||||
| void DataPipesGCWorker::startWork() | ||||
| { | ||||
|     connect(&m_gcTimer, SIGNAL(timeout()), this, SLOT(processGC())); | ||||
|     m_gcTimer.start(10000); // collect garbage every 10s
 | ||||
|     m_running = true; | ||||
| } | ||||
| 
 | ||||
| void DataPipesGCWorker::stopWork() | ||||
| { | ||||
|     m_running = false; | ||||
|     m_gcTimer.stop(); | ||||
|     disconnect(&m_gcTimer, SIGNAL(timeout()), this, SLOT(processGC())); | ||||
| } | ||||
| 
 | ||||
| void DataPipesGCWorker::processGC() | ||||
| { | ||||
|     // qDebug("MessagePipesGCWorker::processGC");
 | ||||
|     m_dataPipesGC.processGC(); | ||||
| } | ||||
							
								
								
									
										69
									
								
								sdrbase/pipes/datapipesgcworker.h
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										69
									
								
								sdrbase/pipes/datapipesgcworker.h
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,69 @@ | ||||
| ///////////////////////////////////////////////////////////////////////////////////
 | ||||
| // Copyright (C) 2020 Edouard Griffiths, F4EXB                                   //
 | ||||
| //                                                                               //
 | ||||
| // This program is free software; you can redistribute it and/or modify          //
 | ||||
| // it under the terms of the GNU General Public License as published by          //
 | ||||
| // the Free Software Foundation as version 3 of the License, or                  //
 | ||||
| // (at your option) any later version.                                           //
 | ||||
| //                                                                               //
 | ||||
| // This program is distributed in the hope that it will be useful,               //
 | ||||
| // but WITHOUT ANY WARRANTY; without even the implied warranty of                //
 | ||||
| // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the                  //
 | ||||
| // GNU General Public License V3 for more details.                               //
 | ||||
| //                                                                               //
 | ||||
| // You should have received a copy of the GNU General Public License             //
 | ||||
| // along with this program. If not, see <http://www.gnu.org/licenses/>.          //
 | ||||
| ///////////////////////////////////////////////////////////////////////////////////
 | ||||
| 
 | ||||
| #ifndef SDRBASE_PIPES_DATAPIPESGCWORKER_H_ | ||||
| #define SDRBASE_PIPES_DATAPIPESGCWORKER_H_ | ||||
| 
 | ||||
| #include <QObject> | ||||
| #include <QTimer> | ||||
| 
 | ||||
| #include "export.h" | ||||
| 
 | ||||
| #include "elementpipesgc.h" | ||||
| #include "datapipescommon.h" | ||||
| 
 | ||||
| class QMutex; | ||||
| class DataFifo; | ||||
| 
 | ||||
| class SDRBASE_API DataPipesGCWorker : public QObject | ||||
| { | ||||
|     Q_OBJECT | ||||
| public: | ||||
|     DataPipesGCWorker(); | ||||
|     ~DataPipesGCWorker(); | ||||
| 
 | ||||
|     void setC2FRegistrations( | ||||
|         QMutex *c2fMutex, | ||||
|         QMap<DataPipesCommon::ChannelRegistrationKey, QList<DataFifo*>> *c2fFifos, | ||||
|         QMap<DataPipesCommon::ChannelRegistrationKey, QList<Feature*>> *c2fFeatures | ||||
|     ) | ||||
|     { | ||||
|         m_dataPipesGC.setRegistrations(c2fMutex, c2fFifos, c2fFeatures); | ||||
|     } | ||||
| 
 | ||||
|     void startWork(); | ||||
|     void stopWork(); | ||||
|     bool isRunning() const { return m_running; } | ||||
| 
 | ||||
| private: | ||||
|     class DataPipesGC : public ElementPipesGC<ChannelAPI, Feature, DataFifo> | ||||
|     { | ||||
|     private: | ||||
|         virtual bool existsProducer(const ChannelAPI *channelAPI); | ||||
|         virtual bool existsConsumer(const Feature *feature); | ||||
|         virtual void sendMessageToConsumer(const DataFifo *fifo,  DataPipesCommon::ChannelRegistrationKey key, Feature *feature); | ||||
|     }; | ||||
| 
 | ||||
|     DataPipesGC m_dataPipesGC; | ||||
|     bool m_running; | ||||
|     QTimer m_gcTimer; | ||||
| 
 | ||||
| private slots: | ||||
|     void processGC(); //!< Collect garbage
 | ||||
| }; | ||||
| 
 | ||||
| #endif // SDRBASE_PIPES_DATAPIPESGCWORKER_H_
 | ||||
							
								
								
									
										45
									
								
								sdrbase/pipes/elementpipescommon.h
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										45
									
								
								sdrbase/pipes/elementpipescommon.h
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,45 @@ | ||||
| ///////////////////////////////////////////////////////////////////////////////////
 | ||||
| // Copyright (C) 2020 Edouard Griffiths, F4EXB                                   //
 | ||||
| //                                                                               //
 | ||||
| // This program is free software; you can redistribute it and/or modify          //
 | ||||
| // it under the terms of the GNU General Public License as published by          //
 | ||||
| // the Free Software Foundation as version 3 of the License, or                  //
 | ||||
| // (at your option) any later version.                                           //
 | ||||
| //                                                                               //
 | ||||
| // This program is distributed in the hope that it will be useful,               //
 | ||||
| // but WITHOUT ANY WARRANTY; without even the implied warranty of                //
 | ||||
| // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the                  //
 | ||||
| // GNU General Public License V3 for more details.                               //
 | ||||
| //                                                                               //
 | ||||
| // You should have received a copy of the GNU General Public License             //
 | ||||
| // along with this program. If not, see <http://www.gnu.org/licenses/>.          //
 | ||||
| ///////////////////////////////////////////////////////////////////////////////////
 | ||||
| 
 | ||||
| #ifndef SDRBASE_PIPES_ELEMNTPIPESCOMMON_H_ | ||||
| #define SDRBASE_PIPES_ELEMNTPIPESCOMMON_H_ | ||||
| 
 | ||||
| namespace ElementPipesCommon { | ||||
| 
 | ||||
| template<typename T> | ||||
| struct RegistrationKey | ||||
| { | ||||
|     const T *m_key; | ||||
|     int m_typeId; | ||||
| 
 | ||||
|     RegistrationKey() = default; | ||||
|     RegistrationKey(const RegistrationKey&) = default; | ||||
|     RegistrationKey& operator=(const RegistrationKey&) = default; | ||||
| 
 | ||||
|     bool operator<(const RegistrationKey& other) const | ||||
|     { | ||||
|         if (m_key !=  other.m_key) { | ||||
|             return m_key < other.m_key; | ||||
|         } else { | ||||
|             return m_typeId < other.m_typeId; | ||||
|         } | ||||
|     } | ||||
| }; | ||||
| 
 | ||||
| } // ElementPipesCommon
 | ||||
| 
 | ||||
| #endif // SDRBASE_PIPES_ELEMNTPIPESCOMMON_H_
 | ||||
							
								
								
									
										139
									
								
								sdrbase/pipes/elementpipesgc.h
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										139
									
								
								sdrbase/pipes/elementpipesgc.h
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,139 @@ | ||||
| ///////////////////////////////////////////////////////////////////////////////////
 | ||||
| // Copyright (C) 2020 Edouard Griffiths, F4EXB                                   //
 | ||||
| //                                                                               //
 | ||||
| // This program is free software; you can redistribute it and/or modify          //
 | ||||
| // it under the terms of the GNU General Public License as published by          //
 | ||||
| // the Free Software Foundation as version 3 of the License, or                  //
 | ||||
| // (at your option) any later version.                                           //
 | ||||
| //                                                                               //
 | ||||
| // This program is distributed in the hope that it will be useful,               //
 | ||||
| // but WITHOUT ANY WARRANTY; without even the implied warranty of                //
 | ||||
| // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the                  //
 | ||||
| // GNU General Public License V3 for more details.                               //
 | ||||
| //                                                                               //
 | ||||
| // You should have received a copy of the GNU General Public License             //
 | ||||
| // along with this program. If not, see <http://www.gnu.org/licenses/>.          //
 | ||||
| ///////////////////////////////////////////////////////////////////////////////////
 | ||||
| 
 | ||||
| #ifndef SDRBASE_PIPES_ELEMNTPIPESGCWORKER_H_ | ||||
| #define SDRBASE_PIPES_ELEMNTPIPESGCWORKER_H_ | ||||
| 
 | ||||
| #include <QMap> | ||||
| #include <QList> | ||||
| #include <QMutex> | ||||
| 
 | ||||
| #include "elementpipescommon.h" | ||||
| 
 | ||||
| template<typename Producer, typename Consumer, typename Element> | ||||
| class ElementPipesGC | ||||
| { | ||||
| public: | ||||
|     ElementPipesGC() : | ||||
|         m_mutex(nullptr), | ||||
|         m_elements(nullptr), | ||||
|         m_consumers(nullptr) | ||||
|     {} | ||||
| 
 | ||||
|     ~ElementPipesGC() | ||||
|     {} | ||||
| 
 | ||||
|     void setRegistrations( | ||||
|         QMutex *mutex, | ||||
|         QMap<ElementPipesCommon::RegistrationKey<Producer>, QList<Element*>> *elements, | ||||
|         QMap<ElementPipesCommon::RegistrationKey<Producer>, QList<Consumer*>> *consumers | ||||
|     ) | ||||
|     { | ||||
|         m_mutex = mutex; | ||||
|         m_elements = elements; | ||||
|         m_consumers = consumers; | ||||
|     } | ||||
| 
 | ||||
|     void processGC() | ||||
|     { | ||||
|         if (m_mutex) | ||||
|         { | ||||
|             QMutexLocker mlock(m_mutex); | ||||
|             typename QMap<ElementPipesCommon::RegistrationKey<Producer>, QList<Consumer*>>::iterator cIt = m_consumers->begin(); | ||||
| 
 | ||||
|             // remove fifos to be deleted from last run
 | ||||
|             for (typename QList<Element*>::iterator elIt = m_elementsToDelete.begin(); elIt != m_elementsToDelete.end(); ++elIt) { | ||||
|                 delete *elIt; | ||||
|             } | ||||
| 
 | ||||
|             m_elementsToDelete.clear(); | ||||
| 
 | ||||
|             // remove keys with empty features
 | ||||
|             while (cIt != m_consumers->end()) | ||||
|             { | ||||
|                 if (cIt.value().size() == 0) { | ||||
|                     cIt = m_consumers->erase(cIt); | ||||
|                 } else { | ||||
|                     ++cIt; | ||||
|                 } | ||||
|             } | ||||
| 
 | ||||
|             // remove keys with empty fifos
 | ||||
|             typename QMap<ElementPipesCommon::RegistrationKey<Producer>, QList<Element*>>::iterator elIt = m_elements->begin(); | ||||
| 
 | ||||
|             while (elIt != m_elements->end()) | ||||
|             { | ||||
|                 if (elIt.value().size() == 0) { | ||||
|                     elIt = m_elements->erase(elIt); | ||||
|                 } else { | ||||
|                     ++elIt; | ||||
|                 } | ||||
|             } | ||||
| 
 | ||||
|             // check deleted channels and features
 | ||||
|             cIt = m_consumers->begin(); | ||||
| 
 | ||||
|             for (;cIt != m_consumers->end(); ++cIt) | ||||
|             { | ||||
|                 ElementPipesCommon::RegistrationKey<Producer> producerKey = cIt.key(); | ||||
|                 const Producer *producer = producerKey.m_key; | ||||
| 
 | ||||
|                 if (existsProducer(producer)) // look for deleted features
 | ||||
|                 { | ||||
|                     QList<Consumer*>& consumers = cIt.value(); | ||||
|                     int i = 0; | ||||
| 
 | ||||
|                     while (i < consumers.size()) | ||||
|                     { | ||||
|                         if (existsConsumer(consumers[i])) { | ||||
|                             i++; | ||||
|                         } | ||||
|                         else | ||||
|                         { | ||||
|                             consumers.removeAt(i); | ||||
|                             Element *element = m_elements->operator[](producerKey)[i]; | ||||
|                             m_elementsToDelete.append(element); | ||||
|                             m_elements->operator[](producerKey).removeAt(i); | ||||
|                         } | ||||
|                     } | ||||
|                 } | ||||
|                 else // channel was destroyed
 | ||||
|                 { | ||||
|                     QList<Consumer*>& consumers = cIt.value(); | ||||
| 
 | ||||
|                     for (int i = 0; i < consumers.size(); i++) { | ||||
|                         sendMessageToConsumer(m_elements->operator[](producerKey)[i], producerKey, consumers[i]); | ||||
|                     } | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
| protected: | ||||
|     virtual bool existsProducer(const Producer *producer) = 0; | ||||
|     virtual bool existsConsumer(const Consumer *consumer) = 0; | ||||
|     virtual void sendMessageToConsumer(const Element *element, ElementPipesCommon::RegistrationKey<Producer> producerKey, Consumer *consumer) = 0; | ||||
| 
 | ||||
| private: | ||||
|     QMutex *m_mutex; | ||||
|     QMap<ElementPipesCommon::RegistrationKey<Producer>, QList<Element*>> *m_elements; | ||||
|     QMap<ElementPipesCommon::RegistrationKey<Producer>, QList<Consumer*>> *m_consumers; | ||||
|     QList<Element*> m_elementsToDelete; | ||||
| }; | ||||
| 
 | ||||
| 
 | ||||
| #endif // SDRBASE_PIPES_ELEMNTPIPESGCWORKER_H_
 | ||||
							
								
								
									
										135
									
								
								sdrbase/pipes/elementpipesregistrations.h
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										135
									
								
								sdrbase/pipes/elementpipesregistrations.h
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,135 @@ | ||||
| ///////////////////////////////////////////////////////////////////////////////////
 | ||||
| // Copyright (C) 2020 Edouard Griffiths, F4EXB                                   //
 | ||||
| //                                                                               //
 | ||||
| // This program is free software; you can redistribute it and/or modify          //
 | ||||
| // it under the terms of the GNU General Public License as published by          //
 | ||||
| // the Free Software Foundation as version 3 of the License, or                  //
 | ||||
| // (at your option) any later version.                                           //
 | ||||
| //                                                                               //
 | ||||
| // This program is distributed in the hope that it will be useful,               //
 | ||||
| // but WITHOUT ANY WARRANTY; without even the implied warranty of                //
 | ||||
| // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the                  //
 | ||||
| // GNU General Public License V3 for more details.                               //
 | ||||
| //                                                                               //
 | ||||
| // You should have received a copy of the GNU General Public License             //
 | ||||
| // along with this program. If not, see <http://www.gnu.org/licenses/>.          //
 | ||||
| ///////////////////////////////////////////////////////////////////////////////////
 | ||||
| 
 | ||||
| #ifndef SDRBASE_PIPES_ELEMNTPIPESREGISTRATION_H_ | ||||
| #define SDRBASE_PIPES_ELEMNTPIPESREGISTRATION_H_ | ||||
| 
 | ||||
| #include <QMap> | ||||
| #include <QHash> | ||||
| #include <QList> | ||||
| #include <QMutex> | ||||
| 
 | ||||
| #include "elementpipescommon.h" | ||||
| 
 | ||||
| template<typename Producer, typename Consumer, typename Element> | ||||
| class ElementPipesRegistrations | ||||
| { | ||||
| public: | ||||
|     ElementPipesRegistrations() : | ||||
|         m_typeCount(0), | ||||
|         m_mutex(QMutex::Recursive) | ||||
|     {} | ||||
| 
 | ||||
|     ~ElementPipesRegistrations() | ||||
|     { | ||||
|         typename QMap<ElementPipesCommon::RegistrationKey<Producer>, QList<Element*>>::iterator mit = m_elements.begin(); | ||||
| 
 | ||||
|         for (; mit != m_elements.end(); ++mit) | ||||
|         { | ||||
|             typename QList<Element*>::iterator elIt = mit->begin(); | ||||
| 
 | ||||
|             for (; elIt != mit->end(); ++elIt) { | ||||
|                 delete *elIt; | ||||
|             } | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     Element *registerProducerToConsumer(const Producer *producer, Consumer *consumer, const QString& type) | ||||
|     { | ||||
|         int typeId; | ||||
|         QMutexLocker mlock(&m_mutex); | ||||
| 
 | ||||
|         if (m_typeIds.contains(type)) | ||||
|         { | ||||
|             typeId = m_typeIds.value(type); | ||||
|         } | ||||
|         else | ||||
|         { | ||||
|             typeId++; | ||||
|             m_typeIds.insert(type, typeId); | ||||
|         } | ||||
| 
 | ||||
|         const typename ElementPipesCommon::RegistrationKey<Producer> regKey | ||||
|             = ElementPipesCommon::RegistrationKey<Producer>{producer, typeId}; | ||||
|         Element *element; | ||||
| 
 | ||||
|         if (m_consumers[regKey].contains(consumer)) | ||||
|         { | ||||
|             int i = m_consumers[regKey].indexOf(consumer); | ||||
|             element = m_elements[regKey][i]; | ||||
|         } | ||||
|         else | ||||
|         { | ||||
|             element = new Element(); | ||||
|             m_elements[regKey].append(element); | ||||
|             m_consumers[regKey].append(consumer); | ||||
|         } | ||||
| 
 | ||||
|         return element; | ||||
|     } | ||||
| 
 | ||||
|     void unregisterProducerToConsumer(const Producer *producer, Consumer *consumer, const QString& type) | ||||
|     { | ||||
|         if (m_typeIds.contains(type)) | ||||
|         { | ||||
|             int typeId = m_typeIds.value(type); | ||||
|             const typename ElementPipesCommon::RegistrationKey<Producer> regKey | ||||
|                 = ElementPipesCommon::RegistrationKey<Producer>{producer, typeId}; | ||||
| 
 | ||||
|             if (m_consumers.contains(regKey) && m_consumers[regKey].contains(consumer)) | ||||
|             { | ||||
|                 QMutexLocker mlock(&m_mutex); | ||||
|                 int i = m_consumers[regKey].indexOf(consumer); | ||||
|                 m_consumers[regKey].removeAt(i); | ||||
|                 Element *element = m_elements[regKey][i]; | ||||
|                 delete element; | ||||
|                 m_elements[regKey].removeAt(i); | ||||
|             } | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     QList<Element*>* getElements(const Producer *producer, const QString& type) | ||||
|     { | ||||
|         if (!m_typeIds.contains(type)) { | ||||
|             return nullptr; | ||||
|         } | ||||
| 
 | ||||
|         QMutexLocker mlock(&m_mutex); | ||||
|         const typename ElementPipesCommon::RegistrationKey<Producer> regKey | ||||
|             = ElementPipesCommon::RegistrationKey<Producer>{producer, m_typeIds.value(type)}; | ||||
| 
 | ||||
|         if (m_elements.contains(regKey)) { | ||||
|             return &m_elements[regKey]; | ||||
|         } else { | ||||
|             return nullptr; | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     QMap<ElementPipesCommon::RegistrationKey<Producer>, QList<Element*>> *getElements() { return &m_elements; } | ||||
|     QMap<ElementPipesCommon::RegistrationKey<Producer>, QList<Consumer*>>  *getConsumers() { return &m_consumers; } | ||||
|     QMutex *getMutex() { return &m_mutex; } | ||||
| 
 | ||||
| 
 | ||||
| private: | ||||
|     QHash<QString, int> m_typeIds; | ||||
|     int m_typeCount; | ||||
|     QMap<ElementPipesCommon::RegistrationKey<Producer>, QList<Element*>> m_elements; | ||||
|     QMap<ElementPipesCommon::RegistrationKey<Producer>, QList<Consumer*>> m_consumers; | ||||
|     QMutex m_mutex; | ||||
| }; | ||||
| 
 | ||||
| #endif // SDRBASE_PIPES_ELEMNTPIPESREGISTRATION_H_
 | ||||
| @ -17,15 +17,19 @@ | ||||
| 
 | ||||
| #include <QGlobalStatic> | ||||
| 
 | ||||
| #include "util/messagequeue.h" | ||||
| 
 | ||||
| #include "messagepipesgcworker.h" | ||||
| #include "messagepipes.h" | ||||
| 
 | ||||
| MessagePipes::MessagePipes() : | ||||
| 	m_typeCount(0), | ||||
| 	m_c2fMutex(QMutex::Recursive) | ||||
| MessagePipes::MessagePipes() | ||||
| { | ||||
| 	m_gcWorker = new MessagePipesGCWorker(); | ||||
| 	m_gcWorker->setC2FRegistrations(&m_c2fMutex, &m_c2fQueues, &m_c2fFEatures); | ||||
| 	m_gcWorker->setC2FRegistrations( | ||||
| 		m_registrations.getMutex(), | ||||
| 		m_registrations.getElements(), | ||||
| 		m_registrations.getConsumers() | ||||
| 	); | ||||
| 	m_gcWorker->moveToThread(&m_gcThread); | ||||
| 	startGC(); | ||||
| } | ||||
| @ -35,91 +39,26 @@ MessagePipes::~MessagePipes() | ||||
| 	if (m_gcWorker->isRunning()) { | ||||
| 		stopGC(); | ||||
| 	} | ||||
| 
 | ||||
| 	QMap<MessagePipesCommon::ChannelRegistrationKey, QList<MessageQueue*>>::iterator mit = m_c2fQueues.begin(); | ||||
| 
 | ||||
| 	for (; mit != m_c2fQueues.end(); ++mit) | ||||
| 	{ | ||||
| 		QList<MessageQueue*>::iterator lit = mit->begin(); | ||||
| 
 | ||||
| 		for (; lit != mit->end(); ++lit) { | ||||
| 			delete *lit; | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| MessageQueue *MessagePipes::registerChannelToFeature(const ChannelAPI *source, Feature *feature, const QString& type) | ||||
| { | ||||
| 	int typeId; | ||||
| 	QMutexLocker mlock(&m_c2fMutex); | ||||
| 
 | ||||
| 	if (m_typeIds.contains(type)) | ||||
| 	{ | ||||
| 		typeId = m_typeIds.value(type); | ||||
| 	} | ||||
| 	else | ||||
| 	{ | ||||
| 		typeId++; | ||||
| 		m_typeIds.insert(type, typeId); | ||||
| 	} | ||||
| 
 | ||||
| 	const MessagePipesCommon::ChannelRegistrationKey regKey = MessagePipesCommon::ChannelRegistrationKey{source, typeId}; | ||||
| 	MessageQueue *messageQueue; | ||||
| 
 | ||||
| 	if (m_c2fFEatures[regKey].contains(feature)) | ||||
| 	{ | ||||
| 		int i = m_c2fFEatures[regKey].indexOf(feature); | ||||
| 		messageQueue = m_c2fQueues[regKey][i]; | ||||
| 	} | ||||
| 	else | ||||
| 	{ | ||||
| 		messageQueue = new MessageQueue(); | ||||
| 		m_c2fQueues[regKey].append(messageQueue); | ||||
| 		m_c2fFEatures[regKey].append(feature); | ||||
| 	} | ||||
| 
 | ||||
| 	return messageQueue; | ||||
| 	return m_registrations.registerProducerToConsumer(source, feature, type); | ||||
| } | ||||
| 
 | ||||
| void MessagePipes::unregisterChannelToFeature(const ChannelAPI *source, Feature *feature, const QString& type) | ||||
| { | ||||
| 	if (m_typeIds.contains(type)) | ||||
| 	{ | ||||
| 		int typeId = m_typeIds.value(type); | ||||
| 		const MessagePipesCommon::ChannelRegistrationKey regKey = MessagePipesCommon::ChannelRegistrationKey{source, typeId}; | ||||
| 
 | ||||
| 		if (m_c2fFEatures.contains(regKey) && m_c2fFEatures[regKey].contains(feature)) | ||||
| 		{ | ||||
| 			QMutexLocker mlock(&m_c2fMutex); | ||||
| 			int i = m_c2fFEatures[regKey].indexOf(feature); | ||||
| 			m_c2fFEatures[regKey].removeAt(i); | ||||
| 			MessageQueue *messageQueue = m_c2fQueues[regKey][i]; | ||||
| 			delete messageQueue; | ||||
| 			m_c2fQueues[regKey].removeAt(i); | ||||
| 		} | ||||
| 	} | ||||
| 	m_registrations.unregisterProducerToConsumer(source, feature, type); | ||||
| } | ||||
| 
 | ||||
| QList<MessageQueue*>* MessagePipes::getMessageQueues(const ChannelAPI *source, const QString& type) | ||||
| { | ||||
| 	if (!m_typeIds.contains(type)) { | ||||
| 		return nullptr; | ||||
| 	} | ||||
| 
 | ||||
| 	QMutexLocker mlock(&m_c2fMutex); | ||||
| 	const MessagePipesCommon::ChannelRegistrationKey regKey = MessagePipesCommon::ChannelRegistrationKey{source, m_typeIds.value(type)}; | ||||
| 
 | ||||
| 	if (m_c2fQueues.contains(regKey)) { | ||||
| 		return &m_c2fQueues[regKey]; | ||||
| 	} else { | ||||
| 		return nullptr; | ||||
| 	} | ||||
| 	return m_registrations.getElements(source, type); | ||||
| } | ||||
| 
 | ||||
| void MessagePipes::startGC() | ||||
| { | ||||
| 	qDebug("MessagePipes::startGC"); | ||||
| 
 | ||||
|     m_gcWorker->startWork(); | ||||
|     m_gcThread.start(); | ||||
| } | ||||
|  | ||||
| @ -25,13 +25,14 @@ | ||||
| #include <QThread> | ||||
| 
 | ||||
| #include "export.h" | ||||
| #include "util/messagequeue.h" | ||||
| 
 | ||||
| #include "messagepipescommon.h" | ||||
| #include "elementpipesregistrations.h" | ||||
| 
 | ||||
| class ChannelAPI; | ||||
| class Feature; | ||||
| class MessagePipesGCWorker; | ||||
| class MessageQueue; | ||||
| 
 | ||||
| class SDRBASE_API MessagePipes : public QObject | ||||
| { | ||||
| @ -47,11 +48,7 @@ public: | ||||
|     QList<MessageQueue*>* getMessageQueues(const ChannelAPI *source, const QString& type); | ||||
| 
 | ||||
| private: | ||||
|     QHash<QString, int> m_typeIds; | ||||
|     int m_typeCount; | ||||
|     QMap<MessagePipesCommon::ChannelRegistrationKey, QList<MessageQueue*>> m_c2fQueues; | ||||
|     QMap<MessagePipesCommon::ChannelRegistrationKey, QList<Feature*>> m_c2fFEatures; | ||||
|     QMutex m_c2fMutex; | ||||
|     ElementPipesRegistrations<ChannelAPI, Feature, MessageQueue> m_registrations; | ||||
|     QThread m_gcThread; //!< Garbage collector thread
 | ||||
|     MessagePipesGCWorker *m_gcWorker; //!< Garbage collector
 | ||||
| 
 | ||||
|  | ||||
| @ -18,12 +18,3 @@ | ||||
| #include "messagepipescommon.h" | ||||
| 
 | ||||
| MESSAGE_CLASS_DEFINITION(MessagePipesCommon::MsgReportChannelDeleted, Message) | ||||
| 
 | ||||
| bool MessagePipesCommon::ChannelRegistrationKey::operator<(const ChannelRegistrationKey& other) const | ||||
| { | ||||
| 	if (m_channel !=  other.m_channel) { | ||||
| 		return m_channel < other.m_channel; | ||||
| 	} else { | ||||
| 		return m_typeId < other.m_typeId; | ||||
| 	} | ||||
| } | ||||
|  | ||||
| @ -24,6 +24,7 @@ | ||||
| 
 | ||||
| #include "export.h" | ||||
| #include "util/message.h" | ||||
| #include "elementpipescommon.h" | ||||
| 
 | ||||
| class ChannelAPI; | ||||
| class Feature; | ||||
| @ -32,16 +33,7 @@ class MessageQueue; | ||||
| class SDRBASE_API MessagePipesCommon | ||||
| { | ||||
| public: | ||||
|     struct ChannelRegistrationKey | ||||
|     { | ||||
|         const ChannelAPI *m_channel; | ||||
|         int m_typeId; | ||||
| 
 | ||||
|         ChannelRegistrationKey() = default; | ||||
|         ChannelRegistrationKey(const ChannelRegistrationKey&) = default; | ||||
|         ChannelRegistrationKey& operator=(const ChannelRegistrationKey&) = default; | ||||
|         bool operator<(const ChannelRegistrationKey& other) const; | ||||
|     }; | ||||
|     typedef ElementPipesCommon::RegistrationKey<ChannelAPI> ChannelRegistrationKey; | ||||
| 
 | ||||
|     /** Send this message to stakeholders when the garbage collector finds that a channel was deleted */ | ||||
|     class SDRBASE_API MsgReportChannelDeleted : public Message { | ||||
|  | ||||
| @ -16,15 +16,33 @@ | ||||
| ///////////////////////////////////////////////////////////////////////////////////
 | ||||
| 
 | ||||
| #include "feature/feature.h" | ||||
| #include "util/messagequeue.h" | ||||
| #include "maincore.h" | ||||
| #include "messagepipescommon.h" | ||||
| #include "messagepipesgcworker.h" | ||||
| 
 | ||||
| bool MessagePipesGCWorker::MessagePipesGC::existsProducer(const ChannelAPI *channel) | ||||
| { | ||||
|     return MainCore::instance()->existsChannel(channel); | ||||
| } | ||||
| 
 | ||||
| bool MessagePipesGCWorker::MessagePipesGC::existsConsumer(const Feature *feature) | ||||
| { | ||||
|     return MainCore::instance()->existsFeature(feature); | ||||
| } | ||||
| 
 | ||||
| void MessagePipesGCWorker::MessagePipesGC::sendMessageToConsumer( | ||||
|     const MessageQueue *messageQueue, | ||||
|     MessagePipesCommon::ChannelRegistrationKey channelKey, | ||||
|     Feature *feature) | ||||
| { | ||||
|     MessagePipesCommon::MsgReportChannelDeleted *msg = MessagePipesCommon::MsgReportChannelDeleted::create( | ||||
|         messageQueue, channelKey); | ||||
|     feature->getInputMessageQueue()->push(msg); | ||||
| } | ||||
| 
 | ||||
| MessagePipesGCWorker::MessagePipesGCWorker() : | ||||
|     m_running(false), | ||||
|     m_c2fMutex(nullptr), | ||||
|     m_c2fQueues(nullptr), | ||||
|     m_c2fFeatures(nullptr) | ||||
|     m_running(false) | ||||
| {} | ||||
| 
 | ||||
| MessagePipesGCWorker::~MessagePipesGCWorker() | ||||
| @ -47,78 +65,5 @@ void MessagePipesGCWorker::stopWork() | ||||
| void MessagePipesGCWorker::processGC() | ||||
| { | ||||
|     // qDebug("MessagePipesGCWorker::processGC");
 | ||||
|     if (m_c2fMutex) | ||||
|     { | ||||
|         QMutexLocker mlock(m_c2fMutex); | ||||
|         QMap<MessagePipesCommon::ChannelRegistrationKey, QList<Feature*>>::iterator fIt = m_c2fFeatures->begin(); | ||||
| 
 | ||||
|         // remove queues to be deleted from last run
 | ||||
|         for (QList<MessageQueue*>::iterator dqIt = m_c2fQueuesToDelete.begin(); dqIt != m_c2fQueuesToDelete.end(); ++dqIt) { | ||||
|             delete *dqIt; | ||||
|         } | ||||
| 
 | ||||
|         m_c2fQueuesToDelete.clear(); | ||||
| 
 | ||||
|         // remove keys with empty features
 | ||||
|         fIt = m_c2fFeatures->begin(); | ||||
| 
 | ||||
|         while (fIt != m_c2fFeatures->end()) | ||||
|         { | ||||
|             if (fIt.value().size() == 0) { | ||||
|                 fIt = m_c2fFeatures->erase(fIt); | ||||
|             } else { | ||||
|                 ++fIt; | ||||
|             } | ||||
|         } | ||||
| 
 | ||||
|         // remove keys with empty message queues
 | ||||
|         QMap<MessagePipesCommon::ChannelRegistrationKey, QList<MessageQueue*>>::iterator qIt = m_c2fQueues->begin(); | ||||
| 
 | ||||
|         while (qIt != m_c2fQueues->end()) | ||||
|         { | ||||
|             if (qIt.value().size() == 0) { | ||||
|                 qIt = m_c2fQueues->erase(qIt); | ||||
|             } else { | ||||
|                 ++qIt; | ||||
|             } | ||||
|         } | ||||
| 
 | ||||
|         // check deleted channels and features
 | ||||
|         for (;fIt != m_c2fFeatures->end(); ++fIt) | ||||
|         { | ||||
|             MessagePipesCommon::ChannelRegistrationKey channelKey = fIt.key(); | ||||
|             const ChannelAPI *channel = channelKey.m_channel; | ||||
| 
 | ||||
|             if (MainCore::instance()->existsChannel(channel)) // look for deleted features
 | ||||
|             { | ||||
|                 QList<Feature*>& features = fIt.value(); | ||||
|                 int i = 0; | ||||
| 
 | ||||
|                 while (i < features.size()) | ||||
|                 { | ||||
|                     if (MainCore::instance()->existsFeature(features[i])) { | ||||
|                         i++; | ||||
|                     } | ||||
|                     else | ||||
|                     { | ||||
|                         features.removeAt(i); | ||||
|                         MessageQueue *messageQueue = m_c2fQueues->operator[](channelKey)[i]; | ||||
|                         m_c2fQueuesToDelete.append(messageQueue); | ||||
|                         m_c2fQueues->operator[](channelKey).removeAt(i); | ||||
|                     } | ||||
|                 } | ||||
|             } | ||||
|             else // channel was destroyed
 | ||||
|             { | ||||
|                 QList<Feature*>& features = fIt.value(); | ||||
| 
 | ||||
|                 for (int i = 0; i < features.size(); i++) | ||||
|                 { | ||||
|                     MessagePipesCommon::MsgReportChannelDeleted *msg = MessagePipesCommon::MsgReportChannelDeleted::create( | ||||
|                         m_c2fQueues->operator[](channelKey)[i], channelKey); | ||||
|                     features[i]->getInputMessageQueue()->push(msg); | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
|     } | ||||
|     m_messagePipesGC.processGC(); | ||||
| } | ||||
|  | ||||
| @ -24,6 +24,7 @@ | ||||
| #include "export.h" | ||||
| 
 | ||||
| #include "messagepipescommon.h" | ||||
| #include "elementpipesgc.h" | ||||
| 
 | ||||
| class QMutex; | ||||
| 
 | ||||
| @ -40,9 +41,7 @@ public: | ||||
|         QMap<MessagePipesCommon::ChannelRegistrationKey, QList<Feature*>> *c2fFeatures | ||||
|     ) | ||||
|     { | ||||
|         m_c2fMutex = c2fMutex; | ||||
|         m_c2fQueues = c2fQueues; | ||||
|         m_c2fFeatures = c2fFeatures; | ||||
|         m_messagePipesGC.setRegistrations(c2fMutex, c2fQueues, c2fFeatures); | ||||
|     } | ||||
| 
 | ||||
|     void startWork(); | ||||
| @ -50,11 +49,16 @@ public: | ||||
|     bool isRunning() const { return m_running; } | ||||
| 
 | ||||
| private: | ||||
|     class MessagePipesGC : public ElementPipesGC<ChannelAPI, Feature, MessageQueue> | ||||
|     { | ||||
|     private: | ||||
|         virtual bool existsProducer(const ChannelAPI *channelAPI); | ||||
|         virtual bool existsConsumer(const Feature *feature); | ||||
|         virtual void sendMessageToConsumer(const MessageQueue *messageQueue,  MessagePipesCommon::ChannelRegistrationKey key, Feature *feature); | ||||
|     }; | ||||
| 
 | ||||
|     MessagePipesGC m_messagePipesGC; | ||||
|     bool m_running; | ||||
|     QMutex *m_c2fMutex; | ||||
|     QMap<MessagePipesCommon::ChannelRegistrationKey, QList<MessageQueue*>> *m_c2fQueues; | ||||
|     QMap<MessagePipesCommon::ChannelRegistrationKey, QList<Feature*>> *m_c2fFeatures; | ||||
|     QList<MessageQueue*> m_c2fQueuesToDelete; | ||||
|     QTimer m_gcTimer; | ||||
| 
 | ||||
| private slots: | ||||
|  | ||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user