SDRdaemon: channel source receiving data

This commit is contained in:
f4exb 2018-08-26 17:22:22 +02:00
parent 804c19904b
commit f30dcf7753
5 changed files with 131 additions and 15 deletions

View File

@ -27,6 +27,8 @@
#include "dsp/upchannelizer.h"
#include "device/devicesinkapi.h"
#include "sdrdaemonchannelsource.h"
#include "channel/sdrdaemonchannelsourcethread.h"
#include "channel/sdrdaemondatablock.h"
MESSAGE_CLASS_DEFINITION(SDRDaemonChannelSource::MsgConfigureSDRDaemonChannelSource, Message)
@ -36,6 +38,7 @@ const QString SDRDaemonChannelSource::m_channelId = "SDRDaemonChannelSource";
SDRDaemonChannelSource::SDRDaemonChannelSource(DeviceSinkAPI *deviceAPI) :
ChannelSourceAPI(m_channelIdURI),
m_deviceAPI(deviceAPI),
m_sourceThread(0),
m_running(false),
m_samplesCount(0),
m_dataAddress("127.0.0.1"),
@ -47,6 +50,9 @@ SDRDaemonChannelSource::SDRDaemonChannelSource(DeviceSinkAPI *deviceAPI) :
m_threadedChannelizer = new ThreadedBasebandSampleSource(m_channelizer, this);
m_deviceAPI->addThreadedSource(m_threadedChannelizer);
m_deviceAPI->addChannelAPI(this);
connect(&m_dataQueue, SIGNAL(dataBlockEnqueued()), this, SLOT(handleData()), Qt::QueuedConnection);
m_cm256p = m_cm256.isInitialized() ? &m_cm256 : 0;
}
SDRDaemonChannelSource::~SDRDaemonChannelSource()
@ -62,23 +68,34 @@ void SDRDaemonChannelSource::pull(Sample& sample)
sample.m_real = 0.0f;
sample.m_imag = 0.0f;
if (m_samplesCount < 1023) {
m_samplesCount++;
} else {
qDebug("SDRDaemonChannelSource::pull: 1024 samples pulled");
m_samplesCount = 0;
}
m_samplesCount++;
}
void SDRDaemonChannelSource::start()
{
qDebug("SDRDaemonChannelSink::start");
if (m_running) {
stop();
}
m_sourceThread = new SDRDaemonChannelSourceThread(&m_dataQueue, m_cm256p);
m_sourceThread->startStop(true);
m_sourceThread->dataBind(m_dataAddress, m_dataPort);
m_running = true;
}
void SDRDaemonChannelSource::stop()
{
qDebug("SDRDaemonChannelSink::stop");
if (m_sourceThread != 0)
{
m_sourceThread->startStop(false);
m_sourceThread->deleteLater();
m_sourceThread = 0;
}
m_running = false;
}
@ -127,15 +144,42 @@ void SDRDaemonChannelSource::applySettings(const SDRDaemonChannelSourceSettings&
<< " m_dataPort: " << settings.m_dataPort
<< " force: " << force;
if ((m_settings.m_dataAddress != settings.m_dataAddress) || force) {
bool change = false;
if ((m_settings.m_dataAddress != settings.m_dataAddress) || force)
{
m_dataAddress = settings.m_dataAddress;
change = true;
}
if ((m_settings.m_dataPort != settings.m_dataPort) || force) {
if ((m_settings.m_dataPort != settings.m_dataPort) || force)
{
m_dataPort = settings.m_dataPort;
change = true;
}
if (change && m_sourceThread) {
m_sourceThread->dataBind(m_dataAddress, m_dataPort);
}
m_settings = settings;
}
bool SDRDaemonChannelSource::handleDataBlock(SDRDaemonDataBlock& dataBlock __attribute__((unused)))
{
//TODO: Push into R/W buffer
return true;
}
void SDRDaemonChannelSource::handleData()
{
SDRDaemonDataBlock* dataBlock;
while (m_running && ((dataBlock = m_dataQueue.pop()) != 0))
{
if (handleDataBlock(*dataBlock))
{
delete dataBlock;
}
}
}

View File

@ -23,13 +23,18 @@
#ifndef SDRDAEMON_CHANNEL_SDRDAEMONCHANNELSOURCE_H_
#define SDRDAEMON_CHANNEL_SDRDAEMONCHANNELSOURCE_H_
#include "cm256.h"
#include "dsp/basebandsamplesource.h"
#include "channel/channelsourceapi.h"
#include "channel/sdrdaemonchannelsourcesettings.h"
#include "channel/sdrdaemondataqueue.h"
class ThreadedBasebandSampleSource;
class UpChannelizer;
class DeviceSinkAPI;
class SDRDaemonChannelSourceThread;
class SDRDaemonDataBlock;
class SDRDaemonChannelSource : public BasebandSampleSource, public ChannelSourceAPI {
Q_OBJECT
@ -80,15 +85,23 @@ private:
DeviceSinkAPI *m_deviceAPI;
ThreadedBasebandSampleSource* m_threadedChannelizer;
UpChannelizer* m_channelizer;
SDRDaemonDataQueue m_dataQueue;
SDRDaemonChannelSourceThread *m_sourceThread;
CM256 m_cm256;
CM256 *m_cm256p;
bool m_running;
SDRDaemonChannelSourceSettings m_settings;
uint32_t m_samplesCount;
uint64_t m_samplesCount;
QString m_dataAddress;
uint16_t m_dataPort;
void applySettings(const SDRDaemonChannelSourceSettings& settings, bool force = false);
bool handleDataBlock(SDRDaemonDataBlock& dataBlock);
private slots:
void handleData();
};

View File

@ -30,8 +30,8 @@ class Serializable;
struct SDRDaemonChannelSourceSettings
{
QString m_dataAddress;
uint16_t m_dataPort;
QString m_dataAddress; //!< Listening (local) data address
uint16_t m_dataPort; //!< Listening data port
SDRDaemonChannelSourceSettings();
void resetToDefaults();

View File

@ -29,6 +29,7 @@
#include "cm256.h"
MESSAGE_CLASS_DEFINITION(SDRDaemonChannelSourceThread::MsgStartStop, Message)
MESSAGE_CLASS_DEFINITION(SDRDaemonChannelSourceThread::MsgDataBind, Message)
SDRDaemonChannelSourceThread::SDRDaemonChannelSourceThread(SDRDaemonDataQueue *dataQueue, CM256 *cm256, QObject* parent) :
QThread(parent),
@ -39,7 +40,6 @@ SDRDaemonChannelSourceThread::SDRDaemonChannelSourceThread(SDRDaemonDataQueue *d
m_socket(0)
{
connect(&m_inputMessageQueue, SIGNAL(messageEnqueued()), this, SLOT(handleInputMessages()), Qt::QueuedConnection);
connect(m_dataQueue, SIGNAL(dataBlockEnqueued()), this, SLOT(handleData()), Qt::QueuedConnection);
}
SDRDaemonChannelSourceThread::~SDRDaemonChannelSourceThread()
@ -53,6 +53,12 @@ void SDRDaemonChannelSourceThread::startStop(bool start)
m_inputMessageQueue.push(msg);
}
void SDRDaemonChannelSourceThread::dataBind(const QString& address, uint16_t port)
{
MsgDataBind *msg = MsgDataBind::create(address, port);
m_inputMessageQueue.push(msg);
}
void SDRDaemonChannelSourceThread::startWork()
{
qDebug("SDRDaemonChannelSourceThread::startWork");
@ -108,5 +114,34 @@ void SDRDaemonChannelSourceThread::handleInputMessages()
delete message;
}
else if (MsgDataBind::match(*message))
{
MsgDataBind* notif = (MsgDataBind*) message;
qDebug("SDRDaemonChannelSourceThread::handleInputMessages: MsgDataBind: %s:%d", qPrintable(notif->getAddress().toString()), notif->getPort());
if (m_socket)
{
disconnect(m_socket, SIGNAL(readyRead()), this, SLOT(readPendingDatagrams()));
m_socket->bind(notif->getAddress(), notif->getPort());
connect(m_socket, SIGNAL(readyRead()), this, SLOT(readPendingDatagrams()));
}
}
}
}
void SDRDaemonChannelSourceThread::readPendingDatagrams()
{
char data[1024];
while (m_socket->hasPendingDatagrams())
{
QHostAddress sender;
quint16 senderPort = 0;
qint64 pendingDataSize = m_socket->pendingDatagramSize();
m_socket->readDatagram(data, pendingDataSize, &sender, &senderPort);
qDebug("SDRDaemonChannelSourceThread::readPendingDatagrams: %lld bytes received from %s:%d",
pendingDataSize,
qPrintable(sender.toString()),
senderPort);
}
}

View File

@ -58,24 +58,47 @@ public:
{ }
};
class MsgDataBind : public Message {
MESSAGE_CLASS_DECLARATION
public:
QHostAddress getAddress() const { return m_address; }
uint16_t getPort() const { return m_port; }
static MsgDataBind* create(const QString& address, uint16_t port) {
return new MsgDataBind(address, port);
}
protected:
QHostAddress m_address;
uint16_t m_port;
MsgDataBind(const QString& address, uint16_t port) :
Message(),
m_port(port)
{
m_address.setAddress(address);
}
};
SDRDaemonChannelSourceThread(SDRDaemonDataQueue *dataQueue, CM256 *cm256, QObject* parent = 0);
~SDRDaemonChannelSourceThread();
void startStop(bool start);
void dataBind(const QString& address, uint16_t port);
private:
QMutex m_startWaitMutex;
QWaitCondition m_startWaiter;
bool m_running;
MessageQueue m_inputMessageQueue;
SDRDaemonDataQueue *m_dataQueue;
CM256 *m_cm256; //!< CM256 library object
QHostAddress m_address;
QUdpSocket *m_socket;
MessageQueue m_inputMessageQueue;
void startWork();
void stopWork();
@ -83,6 +106,7 @@ private:
private slots:
void handleInputMessages();
void readPendingDatagrams();
};