diff --git a/plugins/samplesource/sdrdaemon/sdrdaemonbuffer.cpp b/plugins/samplesource/sdrdaemon/sdrdaemonbuffer.cpp index 8017e630f..e81b4863e 100644 --- a/plugins/samplesource/sdrdaemon/sdrdaemonbuffer.cpp +++ b/plugins/samplesource/sdrdaemon/sdrdaemonbuffer.cpp @@ -38,6 +38,7 @@ SDRdaemonBuffer::SDRdaemonBuffer(std::size_t blockSize) : m_rawBuffer(0) { m_buf = new uint8_t[blockSize]; + updateBufferSize(); m_currentMeta.init(); } @@ -50,19 +51,18 @@ SDRdaemonBuffer::~SDRdaemonBuffer() delete[] m_buf; } -bool SDRdaemonBuffer::writeAndRead(uint8_t *array, std::size_t length, uint8_t *data, std::size_t& dataLength) +bool SDRdaemonBuffer::readMeta(char *array, std::size_t length) { - assert(length == m_blockSize); // TODO: allow fragmented blocks with larger size + assert(length >= sizeof(MetaData) + 8); MetaData *metaData = (MetaData *) array; - if (m_crc64.calculate_crc(array, sizeof(MetaData) - 8) == metaData->m_crc) + if (m_crc64.calculate_crc((uint8_t *)array, sizeof(MetaData) - 8) == metaData->m_crc) { - dataLength = 0; memcpy((void *) &m_dataCRC, (const void *) &array[sizeof(MetaData)], 8); if (!(m_currentMeta == *metaData)) { - std::cerr << "SDRdaemonBuffer::writeAndRead: "; + std::cerr << "SDRdaemonBuffer::readMeta: "; printMeta(metaData); } @@ -74,7 +74,7 @@ bool SDRdaemonBuffer::writeAndRead(uint8_t *array, std::size_t length, uint8_t * if (metaData->m_sampleBytes & 0x10) { m_lz4 = true; - updateSizes(metaData); + updateLZ4Sizes(metaData); } else { @@ -87,38 +87,30 @@ bool SDRdaemonBuffer::writeAndRead(uint8_t *array, std::size_t length, uint8_t * { m_sync = false; } - - return false; } - else + + return m_sync; +} + +void SDRdaemonBuffer::writeData(char *array, std::size_t length) +{ + if (m_sync) { - if (m_sync) + if (m_lz4) { - if (m_lz4) - { - return writeAndReadLZ4(array, length, data, dataLength); - } - else - { - std::memcpy((void *) data, (const void *) array, length); - dataLength = length; - return true; - } + writeDataLZ4(array, length); } else { - dataLength = 0; - return false; + // TODO: uncompressed case } } } -bool SDRdaemonBuffer::writeAndReadLZ4(uint8_t *array, std::size_t length, uint8_t *data, std::size_t& dataLength) +void SDRdaemonBuffer::writeDataLZ4(char *array, std::size_t length) { if (m_lz4InCount + length < m_lz4InSize) { - std::memcpy((void *) &m_lz4InBuffer[m_lz4InCount], (const void *) array, length); // copy data in compressed Buffer - dataLength = 0; m_lz4InCount += length; } else @@ -162,8 +154,6 @@ bool SDRdaemonBuffer::writeAndReadLZ4(uint8_t *array, std::size_t length, uint8_ << " out: " << m_lz4OutSize << std::endl; */ - std::memcpy((void *) data, (const void *) m_lz4OutBuffer, m_lz4OutSize); // send what is in buffer - dataLength = m_lz4OutSize; m_nbSuccessfulDecodes++; } else @@ -176,8 +166,6 @@ bool SDRdaemonBuffer::writeAndReadLZ4(uint8_t *array, std::size_t length, uint8_ //if (compressedSize > 0) //{ - std::memcpy((void *) data, (const void *) m_lz4OutBuffer, m_lz4OutSize); // send what is in buffer - dataLength = m_lz4OutSize; //} //else //{ @@ -187,11 +175,9 @@ bool SDRdaemonBuffer::writeAndReadLZ4(uint8_t *array, std::size_t length, uint8_ m_lz4InCount = 0; } - - return dataLength != 0; } -void SDRdaemonBuffer::updateSizes(MetaData *metaData) +void SDRdaemonBuffer::updateLZ4Sizes(MetaData *metaData) { m_lz4InSize = metaData->m_nbBytes; // compressed input size uint32_t sampleBytes = metaData->m_sampleBytes & 0x0F; diff --git a/plugins/samplesource/sdrdaemon/sdrdaemonbuffer.h b/plugins/samplesource/sdrdaemon/sdrdaemonbuffer.h index 7f90e2168..38f53f9f1 100644 --- a/plugins/samplesource/sdrdaemon/sdrdaemonbuffer.h +++ b/plugins/samplesource/sdrdaemon/sdrdaemonbuffer.h @@ -63,12 +63,14 @@ public: SDRdaemonBuffer(std::size_t blockSize); ~SDRdaemonBuffer(); - bool writeAndRead(uint8_t *array, std::size_t length, uint8_t *data, std::size_t& dataLength); + bool readMeta(char *array, std::size_t length); //!< Attempt to read meta. Returns true if meta block + void writeData(char *array, std::size_t length); //!< Write data into buffer. const MetaData& getCurrentMeta() const { return m_currentMeta; } + bool isSync() const { return m_sync; } private: - bool writeAndReadLZ4(uint8_t *array, std::size_t length, uint8_t *data, std::size_t& dataLength); - void updateSizes(MetaData *metaData); + void updateLZ4Sizes(MetaData *metaData); + void writeDataLZ4(char *array, std::size_t length); void updateBufferSize(); void printMeta(MetaData *metaData); diff --git a/plugins/samplesource/sdrdaemon/sdrdaemonthread.cpp b/plugins/samplesource/sdrdaemon/sdrdaemonthread.cpp index 4a261a2f9..91cc84ba6 100644 --- a/plugins/samplesource/sdrdaemon/sdrdaemonthread.cpp +++ b/plugins/samplesource/sdrdaemon/sdrdaemonthread.cpp @@ -18,24 +18,33 @@ #include #include #include "dsp/samplefifo.h" +#include #include #include "sdrdaemonthread.h" const int SDRdaemonThread::m_rateDivider = 1000/SDRDAEMON_THROTTLE_MS; +const int SDRdaemonThread::m_udpPayloadSize = 512; SDRdaemonThread::SDRdaemonThread(std::ifstream *samplesStream, SampleFifo* sampleFifo, QObject* parent) : QThread(parent), m_running(false), + m_dataSocket(0), + m_dataAddress(QHostAddress::LocalHost), + m_dataPort(9090), + m_dataConnected(false), m_ifstream(samplesStream), m_buf(0), + m_udpBuf(0), m_bufsize(0), m_chunksize(0), m_sampleFifo(sampleFifo), m_samplesCount(0), + m_sdrDaemonBuffer(m_udpPayloadSize), m_samplerate(0) { assert(m_ifstream != 0); + m_udpBuf = new char[m_udpPayloadSize]; } SDRdaemonThread::~SDRdaemonThread() @@ -44,6 +53,10 @@ SDRdaemonThread::~SDRdaemonThread() stopWork(); } + if (m_udpBuf != 0) { + free(m_udpBuf); + } + if (m_buf != 0) { free(m_buf); } @@ -52,30 +65,48 @@ SDRdaemonThread::~SDRdaemonThread() void SDRdaemonThread::startWork() { qDebug() << "SDRdaemonThread::startWork: "; + + if (!m_dataSocket) { + m_dataSocket = new QUdpSocket(this); + } - if (m_ifstream->is_open()) - { - qDebug() << " - file stream open, starting..."; - m_startWaitMutex.lock(); - start(); - while(!m_running) - m_startWaiter.wait(&m_startWaitMutex, 100); - m_startWaitMutex.unlock(); - } + if (m_dataSocket->bind(m_dataAddress, m_dataPort)) + { + qDebug("SDRdaemonThread::startWork: bind data socket to port %d", m_dataPort); + connect(m_dataSocket, SIGNAL(readyRead()), this, SLOT(dataReadyRead())); + + m_startWaitMutex.lock(); + start(); + while(!m_running) + m_startWaiter.wait(&m_startWaitMutex, 100); + m_startWaitMutex.unlock(); + m_dataConnected = true; + } else { - qDebug() << " - file stream closed, not starting."; + qWarning("SDRdaemonThread::startWork: cannot bind data port %d", m_dataPort); + m_dataConnected = false; } } void SDRdaemonThread::stopWork() { qDebug() << "SDRdaemonThread::stopWork"; + + if (m_dataConnected) { + disconnect(m_dataSocket, SIGNAL(readyRead()), this, SLOT(dataReadyRead())); + } + + if (m_dataSocket) { + delete m_dataSocket; + m_dataSocket = 0; + } + m_running = false; wait(); } -void SDRdaemonThread::setSamplerate(int samplerate) +void SDRdaemonThread::setSamplerate(uint32_t samplerate) { qDebug() << "SDRdaemonThread::setSamplerate:" << " new:" << samplerate @@ -151,3 +182,28 @@ void SDRdaemonThread::tick() } } } + +void SDRdaemonThread::dataReadyRead() +{ + while (m_dataSocket->hasPendingDatagrams()) + { + qint64 pendingDataSize = m_dataSocket->pendingDatagramSize(); + qint64 readBytes = m_dataSocket->readDatagram(m_udpBuf, pendingDataSize, 0, 0); + + if (readBytes < 0) + { + qDebug() << "SDRdaemonThread::dataReadyRead: read failed"; + } + else if (readBytes > 0) + { + if (m_sdrDaemonBuffer.readMeta(m_udpBuf, readBytes)) + { + setSamplerate(m_sdrDaemonBuffer.getCurrentMeta().m_sampleRate); + } + else if (m_sdrDaemonBuffer.isSync()) + { + m_sdrDaemonBuffer.writeData(m_udpBuf, readBytes); + } + } + } +} diff --git a/plugins/samplesource/sdrdaemon/sdrdaemonthread.h b/plugins/samplesource/sdrdaemon/sdrdaemonthread.h index 3464a095e..c1b1d1b30 100644 --- a/plugins/samplesource/sdrdaemon/sdrdaemonthread.h +++ b/plugins/samplesource/sdrdaemon/sdrdaemonthread.h @@ -21,12 +21,14 @@ #include #include #include +#include #include #include #include #include "dsp/samplefifo.h" #include "dsp/inthalfbandfilter.h" +#include "sdrdaemonbuffer.h" #define SDRDAEMON_THROTTLE_MS 50 @@ -41,12 +43,15 @@ public: void startWork(); void stopWork(); - void setSamplerate(int samplerate); + void setSamplerate(uint32_t samplerate); bool isRunning() const { return m_running; } std::size_t getSamplesCount() const { return m_samplesCount; } void connectTimer(const QTimer& timer); +public slots: + void dataReadyRead(); + private: QMutex m_startWaitMutex; QWaitCondition m_startWaiter; @@ -54,14 +59,21 @@ private: std::ifstream* m_ifstream; QUdpSocket *m_dataSocket; - quint8 *m_buf; + QHostAddress m_dataAddress; + int m_dataPort; + bool m_dataConnected; + quint8 *m_buf; + char *m_udpBuf; std::size_t m_bufsize; std::size_t m_chunksize; SampleFifo* m_sampleFifo; std::size_t m_samplesCount; - int m_samplerate; + SDRdaemonBuffer m_sdrDaemonBuffer; + + uint32_t m_samplerate; static const int m_rateDivider; + static const int m_udpPayloadSize; void run(); private slots: