diff --git a/plugins/samplesource/sdrdaemon/CMakeLists.txt b/plugins/samplesource/sdrdaemon/CMakeLists.txt index 2fc7bb122..2258a55a1 100644 --- a/plugins/samplesource/sdrdaemon/CMakeLists.txt +++ b/plugins/samplesource/sdrdaemon/CMakeLists.txt @@ -8,6 +8,7 @@ set(sdrdaemon_SOURCES sdrdaemoninput.cpp sdrdaemonplugin.cpp sdrdaemonthread.cpp + sdrdaemonudphandler.cpp ) set(sdrdaemon_HEADERS @@ -16,6 +17,7 @@ set(sdrdaemon_HEADERS sdrdaemoninput.h sdrdaemonplugin.h sdrdaemonthread.h + sdrdaemonudphandler.h ) set(sdrdaemon_FORMS diff --git a/plugins/samplesource/sdrdaemon/sdrdaemongui.cpp b/plugins/samplesource/sdrdaemon/sdrdaemongui.cpp index 3f5ebe5ea..6cccf59ea 100644 --- a/plugins/samplesource/sdrdaemon/sdrdaemongui.cpp +++ b/plugins/samplesource/sdrdaemon/sdrdaemongui.cpp @@ -156,7 +156,8 @@ bool SDRdaemonGui::handleMessage(const Message& message) } else if (SDRdaemonInput::MsgReportSDRdaemonStreamTiming::match(message)) { - m_samplesCount = ((SDRdaemonInput::MsgReportSDRdaemonStreamTiming&)message).getSamplesCount(); + m_startingTimeStamp.tv_sec = ((SDRdaemonInput::MsgReportSDRdaemonStreamTiming&)message).get_tv_sec(); + m_startingTimeStamp.tv_usec = ((SDRdaemonInput::MsgReportSDRdaemonStreamTiming&)message).get_tv_usec(); updateWithStreamTime(); return true; } diff --git a/plugins/samplesource/sdrdaemon/sdrdaemoninput.cpp b/plugins/samplesource/sdrdaemon/sdrdaemoninput.cpp index 5aab189cb..e0693390b 100644 --- a/plugins/samplesource/sdrdaemon/sdrdaemoninput.cpp +++ b/plugins/samplesource/sdrdaemon/sdrdaemoninput.cpp @@ -25,46 +25,10 @@ #include "sdrdaemongui.h" #include "sdrdaemoninput.h" -#include "sdrdaemonthread.h" +#include "sdrdaemonudphandler.h" MESSAGE_CLASS_DEFINITION(SDRdaemonInput::MsgConfigureSDRdaemonUDPLink, Message) MESSAGE_CLASS_DEFINITION(SDRdaemonInput::MsgConfigureSDRdaemonWork, Message) -void SDRdaemonInput::updateLink(const QString& address, quint16 port) -{ - QMutexLocker mutexLocker(&m_mutex); - bool wasRunning = false; - - if ((m_address != address) || (m_port != port)) - { - if (m_SDRdaemonThread != 0) - { - wasRunning = m_SDRdaemonThread->isRunning(); - - if (wasRunning) - { - m_SDRdaemonThread->stopWork(); - } - } - - if (m_SDRdaemonThread != 0) - { - m_SDRdaemonThread->updateLink(address, port); - - if (wasRunning) - { - m_SDRdaemonThread->startWork(); - } - } - - m_address = address; - m_port = port; - - qDebug() << "SDRdaemonInput::updateLink:" - << " address: " << m_address.toStdString().c_str() - << "port: " << m_port; - } -} - MESSAGE_CLASS_DEFINITION(SDRdaemonInput::MsgConfigureSDRdaemonStreamTiming, Message) MESSAGE_CLASS_DEFINITION(SDRdaemonInput::MsgReportSDRdaemonAcquisition, Message) MESSAGE_CLASS_DEFINITION(SDRdaemonInput::MsgReportSDRdaemonStreamData, Message) @@ -73,71 +37,43 @@ MESSAGE_CLASS_DEFINITION(SDRdaemonInput::MsgReportSDRdaemonStreamTiming, Message SDRdaemonInput::SDRdaemonInput(const QTimer& masterTimer) : m_address("127.0.0.1"), m_port(9090), - m_SDRdaemonThread(0), + m_SDRdaemonUDPHandler(0), m_deviceDescription(), m_sampleRate(0), m_centerFrequency(0), m_startingTimeStamp(0), m_masterTimer(masterTimer) { + m_sampleFifo.setSize(96000 * 4); + m_SDRdaemonUDPHandler = new SDRdaemonUDPHandler(&m_sampleFifo, getOutputMessageQueueToGUI()); + m_SDRdaemonUDPHandler->connectTimer(m_masterTimer); } SDRdaemonInput::~SDRdaemonInput() { stop(); + delete m_SDRdaemonUDPHandler; } bool SDRdaemonInput::init(const Message& message) { + qDebug() << "SDRdaemonInput::init"; return false; } bool SDRdaemonInput::start(int device) { - QMutexLocker mutexLocker(&m_mutex); - qDebug() << "SDRdaemonInput::startInput"; - - if (!m_sampleFifo.setSize(96000 * 4)) { - qCritical("Could not allocate SampleFifo"); - return false; - } - - if ((m_SDRdaemonThread = new SDRdaemonThread(&m_sampleFifo, getOutputMessageQueueToGUI())) == NULL) { - qFatal("out of memory"); - stop(); - return false; - } - - m_SDRdaemonThread->connectTimer(m_masterTimer); - m_SDRdaemonThread->startWork(); - m_deviceDescription = "SDRdaemon"; - - mutexLocker.unlock(); - //applySettings(m_generalSettings, m_settings, true); - qDebug("SDRdaemonInput::startInput: started"); - - MsgReportSDRdaemonAcquisition *report = MsgReportSDRdaemonAcquisition::create(true); // acquisition on - getOutputMessageQueueToGUI()->push(report); - + qDebug() << "SDRdaemonInput::start"; + MsgConfigureSDRdaemonWork *command = MsgConfigureSDRdaemonWork::create(true); + getInputMessageQueue()->push(command); return true; } void SDRdaemonInput::stop() { qDebug() << "SDRdaemonInput::stop"; - QMutexLocker mutexLocker(&m_mutex); - - if(m_SDRdaemonThread != 0) - { - m_SDRdaemonThread->stopWork(); - delete m_SDRdaemonThread; - m_SDRdaemonThread = 0; - } - - m_deviceDescription.clear(); - - MsgReportSDRdaemonAcquisition *report = MsgReportSDRdaemonAcquisition::create(false); // acquisition off - getOutputMessageQueueToGUI()->push(report); + MsgConfigureSDRdaemonWork *command = MsgConfigureSDRdaemonWork::create(false); + getInputMessageQueue()->push(command); } const QString& SDRdaemonInput::getDeviceDescription() const @@ -164,8 +100,6 @@ bool SDRdaemonInput::handleMessage(const Message& message) { if (MsgConfigureSDRdaemonUDPLink::match(message)) { - MsgConfigureSDRdaemonUDPLink& conf = (MsgConfigureSDRdaemonUDPLink&) message; - updateLink(conf.getAddress(), conf.getPort()); return true; } else if (MsgConfigureSDRdaemonWork::match(message)) @@ -173,33 +107,16 @@ bool SDRdaemonInput::handleMessage(const Message& message) MsgConfigureSDRdaemonWork& conf = (MsgConfigureSDRdaemonWork&) message; bool working = conf.isWorking(); - if (m_SDRdaemonThread != 0) - { - if (working) - { - m_SDRdaemonThread->startWork(); - MsgReportSDRdaemonStreamTiming *report = - MsgReportSDRdaemonStreamTiming::create(m_SDRdaemonThread->getSamplesCount()); - getOutputMessageQueueToGUI()->push(report); - } - else - { - m_SDRdaemonThread->stopWork(); - } + if (working) { + m_SDRdaemonUDPHandler->start(); + } else { + m_SDRdaemonUDPHandler->stop(); } return true; } else if (MsgConfigureSDRdaemonStreamTiming::match(message)) { - MsgReportSDRdaemonStreamTiming *report; - - if (m_SDRdaemonThread != 0) - { - report = MsgReportSDRdaemonStreamTiming::create(m_SDRdaemonThread->getSamplesCount()); - getOutputMessageQueueToGUI()->push(report); - } - return true; } else diff --git a/plugins/samplesource/sdrdaemon/sdrdaemoninput.h b/plugins/samplesource/sdrdaemon/sdrdaemoninput.h index 461a0190f..e17c8676c 100644 --- a/plugins/samplesource/sdrdaemon/sdrdaemoninput.h +++ b/plugins/samplesource/sdrdaemon/sdrdaemoninput.h @@ -24,7 +24,7 @@ #include #include -class SDRdaemonThread; +class SDRdaemonUDPHandler; class SDRdaemonInput : public SampleSource { public: @@ -141,19 +141,22 @@ public: MESSAGE_CLASS_DECLARATION public: - std::size_t getSamplesCount() const { return m_samplesCount; } + uint32_t get_tv_sec() const { return m_tv_sec; } + uint32_t get_tv_usec() const { return m_tv_usec; } - static MsgReportSDRdaemonStreamTiming* create(std::size_t samplesCount) + static MsgReportSDRdaemonStreamTiming* create(uint32_t tv_sec, uint32_t tv_usec) { - return new MsgReportSDRdaemonStreamTiming(samplesCount); + return new MsgReportSDRdaemonStreamTiming(tv_sec, tv_usec); } protected: - std::size_t m_samplesCount; + uint32_t m_tv_sec; + uint32_t m_tv_usec; - MsgReportSDRdaemonStreamTiming(std::size_t samplesCount) : + MsgReportSDRdaemonStreamTiming(uint32_t tv_sec, uint32_t tv_usec) : Message(), - m_samplesCount(samplesCount) + m_tv_sec(tv_sec), + m_tv_usec(tv_usec) { } }; @@ -175,14 +178,12 @@ private: QMutex m_mutex; QString m_address; quint16 m_port; - SDRdaemonThread* m_SDRdaemonThread; + SDRdaemonUDPHandler* m_SDRdaemonUDPHandler; QString m_deviceDescription; int m_sampleRate; quint64 m_centerFrequency; std::time_t m_startingTimeStamp; const QTimer& m_masterTimer; - - void updateLink(const QString& address, quint16 port); }; #endif // INCLUDE_SDRDAEMONINPUT_H diff --git a/plugins/samplesource/sdrdaemon/sdrdaemonudphandler.cpp b/plugins/samplesource/sdrdaemon/sdrdaemonudphandler.cpp new file mode 100644 index 000000000..f1ce3082c --- /dev/null +++ b/plugins/samplesource/sdrdaemon/sdrdaemonudphandler.cpp @@ -0,0 +1,209 @@ +/////////////////////////////////////////////////////////////////////////////////// +// Copyright (C) 2015 Edouard Griffiths, F4EXB // +// // +// This program is free software; you can redistribute it and/or modify // +// it under the terms of the GNU General Public License as published by // +// the Free Software Foundation as version 3 of the License, or // +// // +// This program is distributed in the hope that it will be useful, // +// but WITHOUT ANY WARRANTY; without even the implied warranty of // +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // +// GNU General Public License V3 for more details. // +// // +// You should have received a copy of the GNU General Public License // +// along with this program. If not, see . // +/////////////////////////////////////////////////////////////////////////////////// + +#include +#include +#include +#include "sdrdaemonudphandler.h" +#include "sdrdaemoninput.h" + +const int SDRdaemonUDPHandler::m_rateDivider = 1000/SDRDAEMON_THROTTLE_MS; +const int SDRdaemonUDPHandler::m_udpPayloadSize = 512; + +SDRdaemonUDPHandler::SDRdaemonUDPHandler(SampleFifo *sampleFifo, MessageQueue *outputMessageQueueToGUI) : + m_mutex(QMutex::Recursive), + m_sdrDaemonBuffer(m_udpPayloadSize), + m_dataSocket(0), + m_dataAddress(QHostAddress::LocalHost), + m_dataPort(9090), + m_dataConnected(false), + m_udpBuf(0), + m_udpReadBytes(0), + m_buf(0), + m_bufsize(0), + m_chunksize(0), + m_sampleFifo(sampleFifo), + m_samplerate(0), + m_centerFrequency(0), + m_tv_sec(0), + m_tv_usec(0), + m_outputMessageQueueToGUI(outputMessageQueueToGUI), + m_tickCount(0), + m_samplesCount(0) +{ + m_udpBuf = new char[m_udpPayloadSize]; +} + +SDRdaemonUDPHandler::~SDRdaemonUDPHandler() +{ + stop(); + delete[] m_udpBuf; +} + +void SDRdaemonUDPHandler::start() +{ + if (!m_dataSocket) + { + m_dataSocket = new QUdpSocket(this); + } + + if (!m_dataConnected) + { + if (m_dataSocket->bind(m_dataAddress, m_dataPort)) + { + qDebug("SDRdaemonUDPHandler::start: bind data socket to port %d", m_dataPort); + connect(this, SIGNAL(dataReady()), this, SLOT(processData())); + connect(m_dataSocket, SIGNAL(readyRead()), this, SLOT(dataReadyRead()), Qt::QueuedConnection); // , Qt::QueuedConnection + m_dataConnected = true; + } + else + { + qWarning("SDRdaemonUDPHandler::start: cannot bind data port %d", m_dataPort); + m_dataConnected = false; + } + } +} + +void SDRdaemonUDPHandler::stop() +{ + qDebug("SDRdaemonUDPHandler::stop"); + + if (m_dataConnected) { + disconnect(m_dataSocket, SIGNAL(readyRead()), this, SLOT(dataReadyRead())); + disconnect(this, SIGNAL(dataReady()), this, SLOT(processData)); + m_dataConnected = false; + } + + if (m_dataSocket) + { + delete m_dataSocket; + m_dataSocket = 0; + } +} + +void SDRdaemonUDPHandler::dataReadyRead() +{ + //qDebug() << "SDRdaemonUDPHandler::dataReadyRead"; + + while (m_dataSocket->hasPendingDatagrams()) + { + qint64 pendingDataSize = m_dataSocket->pendingDatagramSize(); + m_udpReadBytes = m_dataSocket->readDatagram(m_udpBuf, pendingDataSize, 0, 0); + emit dataReady(); + } +} + +void SDRdaemonUDPHandler::processData() +{ + if (m_udpReadBytes < 0) + { + qDebug() << "SDRdaemonThread::processData: read failed"; + } + else if (m_udpReadBytes > 0) + { + QMutexLocker ml(&m_mutex); + + m_sdrDaemonBuffer.updateBlockCounts(m_udpReadBytes); + + if (m_sdrDaemonBuffer.readMeta(m_udpBuf, m_udpReadBytes)) + { + const SDRdaemonBuffer::MetaData& metaData = m_sdrDaemonBuffer.getCurrentMeta(); + bool change = false; + m_tv_sec = metaData.m_tv_sec; + m_tv_usec = metaData.m_tv_usec; + + if (m_samplerate != metaData.m_sampleRate) + { + setSamplerate(metaData.m_sampleRate); + m_samplerate = metaData.m_sampleRate; + change = true; + } + + if (m_centerFrequency != metaData.m_centerFrequency) + { + m_centerFrequency = metaData.m_centerFrequency; + change = true; + } + + if (change) + { + SDRdaemonInput::MsgReportSDRdaemonStreamData *report = SDRdaemonInput::MsgReportSDRdaemonStreamData::create( + metaData.m_sampleRate, + metaData.m_centerFrequency, + metaData.m_tv_sec, + metaData.m_tv_usec); + m_outputMessageQueueToGUI->push(report); + } + } + else if (m_sdrDaemonBuffer.isSync()) + { + m_sdrDaemonBuffer.writeData(m_udpBuf, m_udpReadBytes); + } + } +} + +void SDRdaemonUDPHandler::setSamplerate(uint32_t samplerate) +{ + qDebug() << "SDRdaemonUDPHandler::setSamplerate:" + << " new:" << samplerate + << " old:" << m_samplerate; + + QMutexLocker ml(&m_mutex); + + m_samplerate = samplerate; + m_chunksize = (m_samplerate / m_rateDivider)*4; // TODO: implement FF and slow motion here. 4 corresponds to live. 2 is half speed, 8 is doulbe speed + m_bufsize = m_chunksize; + + if (m_buf == 0) { + qDebug() << " - Allocate buffer"; + m_buf = (quint8*) malloc(m_bufsize); + } else { + qDebug() << " - Re-allocate buffer"; + m_buf = (quint8*) realloc((void*) m_buf, m_bufsize); + } + + qDebug() << "SDRdaemonUDPHandler::setSamplerate:" + << " size: " << m_bufsize + << " #samples: " << (m_bufsize/4); +} + +void SDRdaemonUDPHandler::connectTimer(const QTimer& timer) +{ + qDebug() << "SDRdaemonUDPHandler::connectTimer"; + connect(&timer, SIGNAL(timeout()), this, SLOT(tick())); +} + +void SDRdaemonUDPHandler::tick() +{ + // read samples directly feeding the SampleFifo (no callback) + m_sampleFifo->write(reinterpret_cast(m_sdrDaemonBuffer.readData(m_chunksize)), m_chunksize); + m_samplesCount += m_chunksize / 4; + + if (m_tickCount < m_rateDivider) + { + m_tickCount++; + } + else + { + m_tickCount = 0; + SDRdaemonInput::MsgReportSDRdaemonStreamTiming *report = SDRdaemonInput::MsgReportSDRdaemonStreamTiming::create( + m_tv_sec, + m_tv_usec); + m_outputMessageQueueToGUI->push(report); + } +} + + diff --git a/plugins/samplesource/sdrdaemon/sdrdaemonudphandler.h b/plugins/samplesource/sdrdaemon/sdrdaemonudphandler.h new file mode 100644 index 000000000..8f033cf01 --- /dev/null +++ b/plugins/samplesource/sdrdaemon/sdrdaemonudphandler.h @@ -0,0 +1,81 @@ +/////////////////////////////////////////////////////////////////////////////////// +// Copyright (C) 2015 Edouard Griffiths, F4EXB // +// // +// This program is free software; you can redistribute it and/or modify // +// it under the terms of the GNU General Public License as published by // +// the Free Software Foundation as version 3 of the License, or // +// // +// This program is distributed in the hope that it will be useful, // +// but WITHOUT ANY WARRANTY; without even the implied warranty of // +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // +// GNU General Public License V3 for more details. // +// // +// You should have received a copy of the GNU General Public License // +// along with this program. If not, see . // +/////////////////////////////////////////////////////////////////////////////////// + +#ifndef PLUGINS_SAMPLESOURCE_SDRDAEMON_SDRDAEMONUDPHANDLER_H_ +#define PLUGINS_SAMPLESOURCE_SDRDAEMON_SDRDAEMONUDPHANDLER_H_ + +#include +#include +#include +#include +#include "sdrdaemonbuffer.h" + +#define SDRDAEMON_THROTTLE_MS 50 + +class SampleFifo; +class MessageQueue; +class QTimer; + +class SDRdaemonUDPHandler : public QObject +{ + Q_OBJECT +public: + SDRdaemonUDPHandler(SampleFifo* sampleFifo, MessageQueue *outputMessageQueueToGUI); + ~SDRdaemonUDPHandler(); + void connectTimer(const QTimer& timer); + void start(); + void stop(); + +public slots: + void dataReadyRead(); + void processData(); + +signals: + void dataReady(); + +private: + QMutex m_mutex; + SDRdaemonBuffer m_sdrDaemonBuffer; + QUdpSocket *m_dataSocket; + QHostAddress m_dataAddress; + quint16 m_dataPort; + bool m_dataConnected; + char *m_udpBuf; + qint64 m_udpReadBytes; + quint8 *m_buf; + std::size_t m_bufsize; + std::size_t m_chunksize; + SampleFifo *m_sampleFifo; + uint32_t m_samplerate; + uint32_t m_centerFrequency; + uint32_t m_tv_sec; + uint32_t m_tv_usec; + MessageQueue *m_outputMessageQueueToGUI; + uint32_t m_tickCount; + std::size_t m_samplesCount; + + static const int m_rateDivider; + static const int m_udpPayloadSize; + + void setSamplerate(uint32_t samplerate); + +private slots: + void tick(); +}; + + + +#endif /* PLUGINS_SAMPLESOURCE_SDRDAEMON_SDRDAEMONUDPHANDLER_H_ */