diff --git a/sdrdaemon/channel/sdrdaemonchannelsource.cpp b/sdrdaemon/channel/sdrdaemonchannelsource.cpp index a21846337..e16e6baf9 100644 --- a/sdrdaemon/channel/sdrdaemonchannelsource.cpp +++ b/sdrdaemon/channel/sdrdaemonchannelsource.cpp @@ -27,6 +27,8 @@ #include "dsp/upchannelizer.h" #include "device/devicesinkapi.h" #include "sdrdaemonchannelsource.h" +#include "channel/sdrdaemonchannelsourcethread.h" +#include "channel/sdrdaemondatablock.h" MESSAGE_CLASS_DEFINITION(SDRDaemonChannelSource::MsgConfigureSDRDaemonChannelSource, Message) @@ -36,6 +38,7 @@ const QString SDRDaemonChannelSource::m_channelId = "SDRDaemonChannelSource"; SDRDaemonChannelSource::SDRDaemonChannelSource(DeviceSinkAPI *deviceAPI) : ChannelSourceAPI(m_channelIdURI), m_deviceAPI(deviceAPI), + m_sourceThread(0), m_running(false), m_samplesCount(0), m_dataAddress("127.0.0.1"), @@ -47,6 +50,9 @@ SDRDaemonChannelSource::SDRDaemonChannelSource(DeviceSinkAPI *deviceAPI) : m_threadedChannelizer = new ThreadedBasebandSampleSource(m_channelizer, this); m_deviceAPI->addThreadedSource(m_threadedChannelizer); m_deviceAPI->addChannelAPI(this); + + connect(&m_dataQueue, SIGNAL(dataBlockEnqueued()), this, SLOT(handleData()), Qt::QueuedConnection); + m_cm256p = m_cm256.isInitialized() ? &m_cm256 : 0; } SDRDaemonChannelSource::~SDRDaemonChannelSource() @@ -62,23 +68,34 @@ void SDRDaemonChannelSource::pull(Sample& sample) sample.m_real = 0.0f; sample.m_imag = 0.0f; - if (m_samplesCount < 1023) { - m_samplesCount++; - } else { - qDebug("SDRDaemonChannelSource::pull: 1024 samples pulled"); - m_samplesCount = 0; - } + m_samplesCount++; } void SDRDaemonChannelSource::start() { qDebug("SDRDaemonChannelSink::start"); + + if (m_running) { + stop(); + } + + m_sourceThread = new SDRDaemonChannelSourceThread(&m_dataQueue, m_cm256p); + m_sourceThread->startStop(true); + m_sourceThread->dataBind(m_dataAddress, m_dataPort); m_running = true; } void SDRDaemonChannelSource::stop() { qDebug("SDRDaemonChannelSink::stop"); + + if (m_sourceThread != 0) + { + m_sourceThread->startStop(false); + m_sourceThread->deleteLater(); + m_sourceThread = 0; + } + m_running = false; } @@ -127,15 +144,42 @@ void SDRDaemonChannelSource::applySettings(const SDRDaemonChannelSourceSettings& << " m_dataPort: " << settings.m_dataPort << " force: " << force; - if ((m_settings.m_dataAddress != settings.m_dataAddress) || force) { + bool change = false; + + if ((m_settings.m_dataAddress != settings.m_dataAddress) || force) + { m_dataAddress = settings.m_dataAddress; + change = true; } - if ((m_settings.m_dataPort != settings.m_dataPort) || force) { + if ((m_settings.m_dataPort != settings.m_dataPort) || force) + { m_dataPort = settings.m_dataPort; + change = true; + } + + if (change && m_sourceThread) { + m_sourceThread->dataBind(m_dataAddress, m_dataPort); } m_settings = settings; } +bool SDRDaemonChannelSource::handleDataBlock(SDRDaemonDataBlock& dataBlock __attribute__((unused))) +{ + //TODO: Push into R/W buffer + return true; +} +void SDRDaemonChannelSource::handleData() +{ + SDRDaemonDataBlock* dataBlock; + + while (m_running && ((dataBlock = m_dataQueue.pop()) != 0)) + { + if (handleDataBlock(*dataBlock)) + { + delete dataBlock; + } + } +} diff --git a/sdrdaemon/channel/sdrdaemonchannelsource.h b/sdrdaemon/channel/sdrdaemonchannelsource.h index 7259b2827..be1e73e71 100644 --- a/sdrdaemon/channel/sdrdaemonchannelsource.h +++ b/sdrdaemon/channel/sdrdaemonchannelsource.h @@ -23,13 +23,18 @@ #ifndef SDRDAEMON_CHANNEL_SDRDAEMONCHANNELSOURCE_H_ #define SDRDAEMON_CHANNEL_SDRDAEMONCHANNELSOURCE_H_ +#include "cm256.h" + #include "dsp/basebandsamplesource.h" #include "channel/channelsourceapi.h" #include "channel/sdrdaemonchannelsourcesettings.h" +#include "channel/sdrdaemondataqueue.h" class ThreadedBasebandSampleSource; class UpChannelizer; class DeviceSinkAPI; +class SDRDaemonChannelSourceThread; +class SDRDaemonDataBlock; class SDRDaemonChannelSource : public BasebandSampleSource, public ChannelSourceAPI { Q_OBJECT @@ -80,15 +85,23 @@ private: DeviceSinkAPI *m_deviceAPI; ThreadedBasebandSampleSource* m_threadedChannelizer; UpChannelizer* m_channelizer; - + SDRDaemonDataQueue m_dataQueue; + SDRDaemonChannelSourceThread *m_sourceThread; + CM256 m_cm256; + CM256 *m_cm256p; bool m_running; + SDRDaemonChannelSourceSettings m_settings; - uint32_t m_samplesCount; + uint64_t m_samplesCount; QString m_dataAddress; uint16_t m_dataPort; void applySettings(const SDRDaemonChannelSourceSettings& settings, bool force = false); + bool handleDataBlock(SDRDaemonDataBlock& dataBlock); + +private slots: + void handleData(); }; diff --git a/sdrdaemon/channel/sdrdaemonchannelsourcesettings.h b/sdrdaemon/channel/sdrdaemonchannelsourcesettings.h index ebd26735f..7cd4268b6 100644 --- a/sdrdaemon/channel/sdrdaemonchannelsourcesettings.h +++ b/sdrdaemon/channel/sdrdaemonchannelsourcesettings.h @@ -30,8 +30,8 @@ class Serializable; struct SDRDaemonChannelSourceSettings { - QString m_dataAddress; - uint16_t m_dataPort; + QString m_dataAddress; //!< Listening (local) data address + uint16_t m_dataPort; //!< Listening data port SDRDaemonChannelSourceSettings(); void resetToDefaults(); diff --git a/sdrdaemon/channel/sdrdaemonchannelsourcethread.cpp b/sdrdaemon/channel/sdrdaemonchannelsourcethread.cpp index 5498b9a84..f644a9523 100644 --- a/sdrdaemon/channel/sdrdaemonchannelsourcethread.cpp +++ b/sdrdaemon/channel/sdrdaemonchannelsourcethread.cpp @@ -29,6 +29,7 @@ #include "cm256.h" MESSAGE_CLASS_DEFINITION(SDRDaemonChannelSourceThread::MsgStartStop, Message) +MESSAGE_CLASS_DEFINITION(SDRDaemonChannelSourceThread::MsgDataBind, Message) SDRDaemonChannelSourceThread::SDRDaemonChannelSourceThread(SDRDaemonDataQueue *dataQueue, CM256 *cm256, QObject* parent) : QThread(parent), @@ -39,7 +40,6 @@ SDRDaemonChannelSourceThread::SDRDaemonChannelSourceThread(SDRDaemonDataQueue *d m_socket(0) { connect(&m_inputMessageQueue, SIGNAL(messageEnqueued()), this, SLOT(handleInputMessages()), Qt::QueuedConnection); - connect(m_dataQueue, SIGNAL(dataBlockEnqueued()), this, SLOT(handleData()), Qt::QueuedConnection); } SDRDaemonChannelSourceThread::~SDRDaemonChannelSourceThread() @@ -53,6 +53,12 @@ void SDRDaemonChannelSourceThread::startStop(bool start) m_inputMessageQueue.push(msg); } +void SDRDaemonChannelSourceThread::dataBind(const QString& address, uint16_t port) +{ + MsgDataBind *msg = MsgDataBind::create(address, port); + m_inputMessageQueue.push(msg); +} + void SDRDaemonChannelSourceThread::startWork() { qDebug("SDRDaemonChannelSourceThread::startWork"); @@ -108,5 +114,34 @@ void SDRDaemonChannelSourceThread::handleInputMessages() delete message; } + else if (MsgDataBind::match(*message)) + { + MsgDataBind* notif = (MsgDataBind*) message; + qDebug("SDRDaemonChannelSourceThread::handleInputMessages: MsgDataBind: %s:%d", qPrintable(notif->getAddress().toString()), notif->getPort()); + + if (m_socket) + { + disconnect(m_socket, SIGNAL(readyRead()), this, SLOT(readPendingDatagrams())); + m_socket->bind(notif->getAddress(), notif->getPort()); + connect(m_socket, SIGNAL(readyRead()), this, SLOT(readPendingDatagrams())); + } + } + } +} + +void SDRDaemonChannelSourceThread::readPendingDatagrams() +{ + char data[1024]; + while (m_socket->hasPendingDatagrams()) + { + QHostAddress sender; + quint16 senderPort = 0; + qint64 pendingDataSize = m_socket->pendingDatagramSize(); + m_socket->readDatagram(data, pendingDataSize, &sender, &senderPort); + qDebug("SDRDaemonChannelSourceThread::readPendingDatagrams: %lld bytes received from %s:%d", + pendingDataSize, + qPrintable(sender.toString()), + senderPort); + } } diff --git a/sdrdaemon/channel/sdrdaemonchannelsourcethread.h b/sdrdaemon/channel/sdrdaemonchannelsourcethread.h index e7f89e8ce..de4767578 100644 --- a/sdrdaemon/channel/sdrdaemonchannelsourcethread.h +++ b/sdrdaemon/channel/sdrdaemonchannelsourcethread.h @@ -58,24 +58,47 @@ public: { } }; + class MsgDataBind : public Message { + MESSAGE_CLASS_DECLARATION + + public: + QHostAddress getAddress() const { return m_address; } + uint16_t getPort() const { return m_port; } + + static MsgDataBind* create(const QString& address, uint16_t port) { + return new MsgDataBind(address, port); + } + + protected: + QHostAddress m_address; + uint16_t m_port; + + MsgDataBind(const QString& address, uint16_t port) : + Message(), + m_port(port) + { + m_address.setAddress(address); + } + }; + SDRDaemonChannelSourceThread(SDRDaemonDataQueue *dataQueue, CM256 *cm256, QObject* parent = 0); ~SDRDaemonChannelSourceThread(); void startStop(bool start); + void dataBind(const QString& address, uint16_t port); private: QMutex m_startWaitMutex; QWaitCondition m_startWaiter; bool m_running; + MessageQueue m_inputMessageQueue; SDRDaemonDataQueue *m_dataQueue; CM256 *m_cm256; //!< CM256 library object QHostAddress m_address; QUdpSocket *m_socket; - MessageQueue m_inputMessageQueue; - void startWork(); void stopWork(); @@ -83,6 +106,7 @@ private: private slots: void handleInputMessages(); + void readPendingDatagrams(); };