From 96411edd3c272ef9509115947dec581f2319c2da Mon Sep 17 00:00:00 2001 From: f4exb <f4exb06@gmail.com> Date: Sun, 12 Dec 2021 10:44:58 +0100 Subject: [PATCH] Remote Output and Remote Source more fixes --- plugins/channelrx/remotesink/remotesink.h | 3 - .../channelrx/remotesink/remotesinkfifo.cpp | 10 +- plugins/channelrx/remotesink/remotesinkfifo.h | 6 +- .../channelrx/remotesink/remotesinksender.cpp | 26 ++-- .../channelrx/remotesink/remotesinksender.h | 6 +- .../channelrx/remotesink/remotesinksink.cpp | 24 ++-- plugins/channelrx/remotesink/remotesinksink.h | 2 +- .../remotesource/remotesourcesource.cpp | 67 ++++----- .../remotesource/remotesourcesource.h | 2 +- .../remotesource/remotesourceworker.cpp | 135 +++++++++--------- .../remotesource/remotesourceworker.h | 12 +- .../samplesink/remoteoutput/remoteoutput.cpp | 21 ++- .../remoteoutput/remoteoutputfifo.cpp | 10 +- .../remoteoutput/remoteoutputfifo.h | 6 +- .../remoteoutput/remoteoutputgui.cpp | 7 +- .../remoteoutput/remoteoutputgui.ui | 32 ++--- .../remoteoutput/remoteoutputsender.cpp | 32 ++--- .../remoteoutput/remoteoutputsender.h | 6 +- .../remoteoutput/remoteoutputworker.cpp | 1 + .../remoteoutput/remoteoutputworker.h | 2 +- .../samplesink/remoteoutput/udpsinkfec.cpp | 24 ++-- plugins/samplesink/remoteoutput/udpsinkfec.h | 2 +- sdrbase/channel/remotedatablock.h | 8 +- sdrbase/channel/remotedataqueue.cpp | 13 +- sdrbase/channel/remotedataqueue.h | 11 +- sdrbase/channel/remotedatareadqueue.cpp | 57 +++----- sdrbase/channel/remotedatareadqueue.h | 25 ++-- 27 files changed, 259 insertions(+), 291 deletions(-) diff --git a/plugins/channelrx/remotesink/remotesink.h b/plugins/channelrx/remotesink/remotesink.h index 605a3f4f1..c78cf49d6 100644 --- a/plugins/channelrx/remotesink/remotesink.h +++ b/plugins/channelrx/remotesink/remotesink.h @@ -113,9 +113,6 @@ public: static const char* const m_channelIdURI; static const char* const m_channelId; -signals: - void dataBlockAvailable(RemoteDataBlock *dataBlock); - private: DeviceAPI *m_deviceAPI; QThread *m_thread; diff --git a/plugins/channelrx/remotesink/remotesinkfifo.cpp b/plugins/channelrx/remotesink/remotesinkfifo.cpp index 2683e6c5b..fd1572fb2 100644 --- a/plugins/channelrx/remotesink/remotesinkfifo.cpp +++ b/plugins/channelrx/remotesink/remotesinkfifo.cpp @@ -47,7 +47,7 @@ void RemoteSinkFifo::reset() m_writeHead = 0; } -RemoteDataBlock *RemoteSinkFifo::getDataBlock() +RemoteDataFrame *RemoteSinkFifo::getDataFrame() { QMutexLocker mutexLocker(&m_mutex); m_servedHead = m_writeHead; @@ -62,18 +62,18 @@ RemoteDataBlock *RemoteSinkFifo::getDataBlock() return &m_data[m_servedHead]; } -unsigned int RemoteSinkFifo::readDataBlock(RemoteDataBlock **dataBlock) +unsigned int RemoteSinkFifo::readDataFrame(RemoteDataFrame **dataFrame) { QMutexLocker mutexLocker(&m_mutex); if (calculateRemainder() == 0) { - *dataBlock = nullptr; + *dataFrame = nullptr; return 0; } else { - *dataBlock = &m_data[m_readHead]; + *dataFrame = &m_data[m_readHead]; m_readHead = m_readHead < m_size - 1 ? m_readHead + 1 : 0; return calculateRemainder(); } @@ -92,4 +92,4 @@ unsigned int RemoteSinkFifo::calculateRemainder() } else { return m_size - (m_readHead - m_servedHead); } -} \ No newline at end of file +} diff --git a/plugins/channelrx/remotesink/remotesinkfifo.h b/plugins/channelrx/remotesink/remotesinkfifo.h index 7e0d47b13..7060a5e52 100644 --- a/plugins/channelrx/remotesink/remotesinkfifo.h +++ b/plugins/channelrx/remotesink/remotesinkfifo.h @@ -34,15 +34,15 @@ public: void resize(unsigned int size); void reset(); - RemoteDataBlock *getDataBlock(); - unsigned int readDataBlock(RemoteDataBlock **dataBlock); + RemoteDataFrame *getDataFrame(); + unsigned int readDataFrame(RemoteDataFrame **dataFrame); unsigned int getRemainder(); signals: void dataBlockServed(); private: - std::vector<RemoteDataBlock> m_data; + std::vector<RemoteDataFrame> m_data; int m_size; int m_readHead; //!< index of last data block processed int m_servedHead; //!< index of last data block served diff --git a/plugins/channelrx/remotesink/remotesinksender.cpp b/plugins/channelrx/remotesink/remotesinksender.cpp index a3fb839b4..ed0e894d8 100644 --- a/plugins/channelrx/remotesink/remotesinksender.cpp +++ b/plugins/channelrx/remotesink/remotesinksender.cpp @@ -57,37 +57,37 @@ RemoteSinkSender::~RemoteSinkSender() m_socket->deleteLater(); } -RemoteDataBlock *RemoteSinkSender::getDataBlock() +RemoteDataFrame *RemoteSinkSender::getDataFrame() { - return m_fifo.getDataBlock(); + return m_fifo.getDataFrame(); } void RemoteSinkSender::handleData() { - RemoteDataBlock *dataBlock; + RemoteDataFrame *dataFrame; unsigned int remainder = m_fifo.getRemainder(); while (remainder != 0) { - remainder = m_fifo.readDataBlock(&dataBlock); + remainder = m_fifo.readDataFrame(&dataFrame); - if (dataBlock) { - sendDataBlock(dataBlock); + if (dataFrame) { + sendDataFrame(dataFrame); } } } -void RemoteSinkSender::sendDataBlock(RemoteDataBlock *dataBlock) +void RemoteSinkSender::sendDataFrame(RemoteDataFrame *dataFrame) { CM256::cm256_encoder_params cm256Params; //!< Main interface with CM256 encoder CM256::cm256_block descriptorBlocks[256]; //!< Pointers to data for CM256 encoder RemoteProtectedBlock fecBlocks[256]; //!< FEC data - uint16_t frameIndex = dataBlock->m_txControlBlock.m_frameIndex; - int nbBlocksFEC = dataBlock->m_txControlBlock.m_nbBlocksFEC; - m_address.setAddress(dataBlock->m_txControlBlock.m_dataAddress); - uint16_t dataPort = dataBlock->m_txControlBlock.m_dataPort; - RemoteSuperBlock *txBlockx = dataBlock->m_superBlocks; + uint16_t frameIndex = dataFrame->m_txControlBlock.m_frameIndex; + int nbBlocksFEC = dataFrame->m_txControlBlock.m_nbBlocksFEC; + m_address.setAddress(dataFrame->m_txControlBlock.m_dataAddress); + uint16_t dataPort = dataFrame->m_txControlBlock.m_dataPort; + RemoteSuperBlock *txBlockx = dataFrame->m_superBlocks; if ((nbBlocksFEC == 0) || !m_cm256p) // Do not FEC encode { @@ -141,5 +141,5 @@ void RemoteSinkSender::sendDataBlock(RemoteDataBlock *dataBlock) } } - dataBlock->m_txControlBlock.m_processed = true; + dataFrame->m_txControlBlock.m_processed = true; } diff --git a/plugins/channelrx/remotesink/remotesinksender.h b/plugins/channelrx/remotesink/remotesinksender.h index 98d74279c..f73a3e411 100644 --- a/plugins/channelrx/remotesink/remotesinksender.h +++ b/plugins/channelrx/remotesink/remotesinksender.h @@ -36,7 +36,7 @@ #include "remotesinkfifo.h" -class RemoteDataBlock; +class RemoteDataFrame; class CM256; class QUdpSocket; @@ -47,7 +47,7 @@ public: RemoteSinkSender(); ~RemoteSinkSender(); - RemoteDataBlock *getDataBlock(); + RemoteDataFrame *getDataFrame(); private: RemoteSinkFifo m_fifo; @@ -57,7 +57,7 @@ private: QHostAddress m_address; QUdpSocket *m_socket; - void sendDataBlock(RemoteDataBlock *dataBlock); + void sendDataFrame(RemoteDataFrame *dataFrame); private slots: void handleData(); diff --git a/plugins/channelrx/remotesink/remotesinksink.cpp b/plugins/channelrx/remotesink/remotesinksink.cpp index dbf852f6a..920e59902 100644 --- a/plugins/channelrx/remotesink/remotesinksink.cpp +++ b/plugins/channelrx/remotesink/remotesinksink.cpp @@ -31,7 +31,7 @@ RemoteSinkSink::RemoteSinkSink() : m_txBlockIndex(0), m_frameCount(0), m_sampleIndex(0), - m_dataBlock(nullptr), + m_dataFrame(nullptr), m_deviceCenterFrequency(0), m_frequencyOffset(0), m_basebandSampleRate(48000), @@ -99,14 +99,14 @@ void RemoteSinkSink::feed(const SampleVector::const_iterator& begin, const Sampl metaData.m_tv_sec = nowus / 1000000UL; // tv.tv_sec; metaData.m_tv_usec = nowus % 1000000UL; // tv.tv_usec; - if (!m_dataBlock) { // on the very first cycle there is no data block allocated - m_dataBlock = m_remoteSinkSender->getDataBlock(); // ask a new block to sender + if (!m_dataFrame) { // on the very first cycle there is no data block allocated + m_dataFrame = m_remoteSinkSender->getDataFrame(); // ask a new block to sender } boost::crc_32_type crc32; crc32.process_bytes(&metaData, sizeof(RemoteMetaDataFEC)-4); metaData.m_crc32 = crc32.checksum(); - RemoteSuperBlock& superBlock = m_dataBlock->m_superBlocks[0]; // first block + RemoteSuperBlock& superBlock = m_dataFrame->m_superBlocks[0]; // first block superBlock.init(); superBlock.m_header.m_frameIndex = m_frameCount; superBlock.m_header.m_blockIndex = m_txBlockIndex; @@ -156,18 +156,18 @@ void RemoteSinkSink::feed(const SampleVector::const_iterator& begin, const Sampl m_superBlock.m_header.m_blockIndex = m_txBlockIndex; m_superBlock.m_header.m_sampleBytes = (SDR_RX_SAMP_SZ <= 16 ? 2 : 4); m_superBlock.m_header.m_sampleBits = SDR_RX_SAMP_SZ; - m_dataBlock->m_superBlocks[m_txBlockIndex] = m_superBlock; + m_dataFrame->m_superBlocks[m_txBlockIndex] = m_superBlock; if (m_txBlockIndex == RemoteNbOrginalBlocks - 1) // frame complete { - m_dataBlock->m_txControlBlock.m_frameIndex = m_frameCount; - m_dataBlock->m_txControlBlock.m_processed = false; - m_dataBlock->m_txControlBlock.m_complete = true; - m_dataBlock->m_txControlBlock.m_nbBlocksFEC = m_nbBlocksFEC; - m_dataBlock->m_txControlBlock.m_dataAddress = m_dataAddress; - m_dataBlock->m_txControlBlock.m_dataPort = m_dataPort; + m_dataFrame->m_txControlBlock.m_frameIndex = m_frameCount; + m_dataFrame->m_txControlBlock.m_processed = false; + m_dataFrame->m_txControlBlock.m_complete = true; + m_dataFrame->m_txControlBlock.m_nbBlocksFEC = m_nbBlocksFEC; + m_dataFrame->m_txControlBlock.m_dataAddress = m_dataAddress; + m_dataFrame->m_txControlBlock.m_dataPort = m_dataPort; - m_dataBlock = m_remoteSinkSender->getDataBlock(); // ask a new block to sender + m_dataFrame = m_remoteSinkSender->getDataFrame(); // ask a new block to sender m_txBlockIndex = 0; m_frameCount++; diff --git a/plugins/channelrx/remotesink/remotesinksink.h b/plugins/channelrx/remotesink/remotesinksink.h index 8456fbde4..b0423bdc5 100644 --- a/plugins/channelrx/remotesink/remotesinksink.h +++ b/plugins/channelrx/remotesink/remotesinksink.h @@ -55,7 +55,7 @@ private: int m_sampleIndex; //!< Current sample index in protected block data RemoteSuperBlock m_superBlock; RemoteMetaDataFEC m_currentMetaFEC; - RemoteDataBlock *m_dataBlock; + RemoteDataFrame *m_dataFrame; uint64_t m_deviceCenterFrequency; int64_t m_frequencyOffset; diff --git a/plugins/channeltx/remotesource/remotesourcesource.cpp b/plugins/channeltx/remotesource/remotesourcesource.cpp index afe542a31..7458320cc 100644 --- a/plugins/channeltx/remotesource/remotesourcesource.cpp +++ b/plugins/channeltx/remotesource/remotesourcesource.cpp @@ -31,7 +31,7 @@ RemoteSourceSource::RemoteSourceSource() : connect(&m_dataQueue, SIGNAL(dataBlockEnqueued()), this, SLOT(handleData()), Qt::QueuedConnection); m_cm256p = m_cm256.isInitialized() ? &m_cm256 : 0; m_currentMeta.init(); - m_dataReadQueue.setSize(50); + m_dataReadQueue.setSize(20); applyChannelSettings(m_channelSampleRate, true); } @@ -53,7 +53,8 @@ void RemoteSourceSource::pull(SampleVector::iterator begin, unsigned int nbSampl void RemoteSourceSource::pullOne(Sample& sample) { - // m_dataReadQueue.readSample(sample, true); // true is scale for Tx + m_dataReadQueue.readSample(sample, true); // true is scale for Tx + return; Complex ci; @@ -129,18 +130,18 @@ void RemoteSourceSource::stopWorker() void RemoteSourceSource::handleData() { - RemoteDataBlock* dataBlock; + RemoteDataFrame* dataFrame; - while (m_running && ((dataBlock = m_dataQueue.pop()) != nullptr)) { - handleDataBlock(dataBlock); + while (m_running && ((dataFrame = m_dataQueue.pop()) != nullptr)) { + handleDataFrame(dataFrame); } } -void RemoteSourceSource::handleDataBlock(RemoteDataBlock* dataBlock) +void RemoteSourceSource::handleDataFrame(RemoteDataFrame* dataFrame) { - if (dataBlock->m_rxControlBlock.m_blockCount < RemoteNbOrginalBlocks) + if (dataFrame->m_rxControlBlock.m_blockCount < RemoteNbOrginalBlocks) { - qWarning("RemoteSourceSource::handleDataBlock: incomplete data block: not processing"); + qWarning("RemoteSourceSource::handleDataFrame: incomplete data frame: not processing"); } else { @@ -148,69 +149,69 @@ void RemoteSourceSource::handleDataBlock(RemoteDataBlock* dataBlock) for (int blockIndex = 0; blockIndex < 256; blockIndex++) { - if ((blockIndex == 0) && (dataBlock->m_rxControlBlock.m_metaRetrieved)) + if ((blockIndex == 0) && (dataFrame->m_rxControlBlock.m_metaRetrieved)) { m_cm256DescriptorBlocks[blockCount].Index = 0; - m_cm256DescriptorBlocks[blockCount].Block = (void *) &(dataBlock->m_superBlocks[0].m_protectedBlock); + m_cm256DescriptorBlocks[blockCount].Block = (void *) &(dataFrame->m_superBlocks[0].m_protectedBlock); blockCount++; } - else if (dataBlock->m_superBlocks[blockIndex].m_header.m_blockIndex != 0) + else if (dataFrame->m_superBlocks[blockIndex].m_header.m_blockIndex != 0) { - m_cm256DescriptorBlocks[blockCount].Index = dataBlock->m_superBlocks[blockIndex].m_header.m_blockIndex; - m_cm256DescriptorBlocks[blockCount].Block = (void *) &(dataBlock->m_superBlocks[blockIndex].m_protectedBlock); + m_cm256DescriptorBlocks[blockCount].Index = dataFrame->m_superBlocks[blockIndex].m_header.m_blockIndex; + m_cm256DescriptorBlocks[blockCount].Block = (void *) &(dataFrame->m_superBlocks[blockIndex].m_protectedBlock); blockCount++; } } - //qDebug("RemoteSourceSource::handleDataBlock: frame: %u blocks: %d", dataBlock.m_rxControlBlock.m_frameIndex, blockCount); + //qDebug("RemoteSourceSource::handleDataFrame: frame: %u blocks: %d", dataFrame.m_rxControlBlock.m_frameIndex, blockCount); // Need to use the CM256 recovery - if (m_cm256p &&(dataBlock->m_rxControlBlock.m_originalCount < RemoteNbOrginalBlocks)) + if (m_cm256p &&(dataFrame->m_rxControlBlock.m_originalCount < RemoteNbOrginalBlocks)) { - qDebug("RemoteSourceSource::handleDataBlock: %d recovery blocks", dataBlock->m_rxControlBlock.m_recoveryCount); + qDebug("RemoteSourceSource::handleDataFrame: %d recovery blocks", dataFrame->m_rxControlBlock.m_recoveryCount); CM256::cm256_encoder_params paramsCM256; paramsCM256.BlockBytes = sizeof(RemoteProtectedBlock); // never changes paramsCM256.OriginalCount = RemoteNbOrginalBlocks; // never changes if (m_currentMeta.m_tv_sec == 0) { - paramsCM256.RecoveryCount = dataBlock->m_rxControlBlock.m_recoveryCount; + paramsCM256.RecoveryCount = dataFrame->m_rxControlBlock.m_recoveryCount; } else { paramsCM256.RecoveryCount = m_currentMeta.m_nbFECBlocks; } // update counters - if (dataBlock->m_rxControlBlock.m_originalCount < RemoteNbOrginalBlocks - paramsCM256.RecoveryCount) { - m_nbUncorrectableErrors += RemoteNbOrginalBlocks - paramsCM256.RecoveryCount - dataBlock->m_rxControlBlock.m_originalCount; + if (dataFrame->m_rxControlBlock.m_originalCount < RemoteNbOrginalBlocks - paramsCM256.RecoveryCount) { + m_nbUncorrectableErrors += RemoteNbOrginalBlocks - paramsCM256.RecoveryCount - dataFrame->m_rxControlBlock.m_originalCount; } else { - m_nbCorrectableErrors += dataBlock->m_rxControlBlock.m_recoveryCount; + m_nbCorrectableErrors += dataFrame->m_rxControlBlock.m_recoveryCount; } if (m_cm256.cm256_decode(paramsCM256, m_cm256DescriptorBlocks)) // CM256 decode { - qWarning() << "RemoteSourceSource::handleDataBlock: decode CM256 error:" - << " m_originalCount: " << dataBlock->m_rxControlBlock.m_originalCount - << " m_recoveryCount: " << dataBlock->m_rxControlBlock.m_recoveryCount; + qWarning() << "RemoteSourceSource::handleDataFrame: decode CM256 error:" + << " m_originalCount: " << dataFrame->m_rxControlBlock.m_originalCount + << " m_recoveryCount: " << dataFrame->m_rxControlBlock.m_recoveryCount; } else { - for (int ir = 0; ir < dataBlock->m_rxControlBlock.m_recoveryCount; ir++) // restore missing blocks + for (int ir = 0; ir < dataFrame->m_rxControlBlock.m_recoveryCount; ir++) // restore missing blocks { - int recoveryIndex = RemoteNbOrginalBlocks - dataBlock->m_rxControlBlock.m_recoveryCount + ir; + int recoveryIndex = RemoteNbOrginalBlocks - dataFrame->m_rxControlBlock.m_recoveryCount + ir; int blockIndex = m_cm256DescriptorBlocks[recoveryIndex].Index; RemoteProtectedBlock *recoveredBlock = (RemoteProtectedBlock *) m_cm256DescriptorBlocks[recoveryIndex].Block; - memcpy((void *) &(dataBlock->m_superBlocks[blockIndex].m_protectedBlock), recoveredBlock, sizeof(RemoteProtectedBlock)); - if ((blockIndex == 0) && !dataBlock->m_rxControlBlock.m_metaRetrieved) { - dataBlock->m_rxControlBlock.m_metaRetrieved = true; + memcpy((void *) &(dataFrame->m_superBlocks[blockIndex].m_protectedBlock), recoveredBlock, sizeof(RemoteProtectedBlock)); + if ((blockIndex == 0) && !dataFrame->m_rxControlBlock.m_metaRetrieved) { + dataFrame->m_rxControlBlock.m_metaRetrieved = true; } } } } // Validate block zero and retrieve its data - if (dataBlock->m_rxControlBlock.m_metaRetrieved) + if (dataFrame->m_rxControlBlock.m_metaRetrieved) { - RemoteMetaDataFEC *metaData = (RemoteMetaDataFEC *) &(dataBlock->m_superBlocks[0].m_protectedBlock); + RemoteMetaDataFEC *metaData = (RemoteMetaDataFEC *) &(dataFrame->m_superBlocks[0].m_protectedBlock); boost::crc_32_type crc32; crc32.process_bytes(metaData, sizeof(RemoteMetaDataFEC)-4); @@ -218,7 +219,7 @@ void RemoteSourceSource::handleDataBlock(RemoteDataBlock* dataBlock) { if (!(m_currentMeta == *metaData)) { - printMeta("RemoteSourceSource::handleDataBlock", metaData); + printMeta("RemoteSourceSource::handleDataFrame", metaData); if (m_currentMeta.m_sampleRate != metaData->m_sampleRate) { emit newRemoteSampleRate(metaData->m_sampleRate); @@ -230,11 +231,11 @@ void RemoteSourceSource::handleDataBlock(RemoteDataBlock* dataBlock) } else { - qWarning() << "RemoteSource::handleDataBlock: recovered meta: invalid CRC32"; + qWarning() << "RemoteSource::handleDataFrame: recovered meta: invalid CRC32"; } } - m_dataReadQueue.push(dataBlock); // Push into R/W buffer + m_dataReadQueue.push(dataFrame); // Push into R/W buffer } } diff --git a/plugins/channeltx/remotesource/remotesourcesource.h b/plugins/channeltx/remotesource/remotesourcesource.h index 83b7fd07c..637144bb4 100644 --- a/plugins/channeltx/remotesource/remotesourcesource.h +++ b/plugins/channeltx/remotesource/remotesourcesource.h @@ -80,7 +80,7 @@ private: void startWorker(); void stopWorker(); - void handleDataBlock(RemoteDataBlock *dataBlock); + void handleDataFrame(RemoteDataFrame *dataFrame); void printMeta(const QString& header, RemoteMetaDataFEC *metaData); void getSample(); diff --git a/plugins/channeltx/remotesource/remotesourceworker.cpp b/plugins/channeltx/remotesource/remotesourceworker.cpp index 91d60b059..7926af78a 100644 --- a/plugins/channeltx/remotesource/remotesourceworker.cpp +++ b/plugins/channeltx/remotesource/remotesourceworker.cpp @@ -33,9 +33,12 @@ RemoteSourceWorker::RemoteSourceWorker(RemoteDataQueue *dataQueue, QObject* pare m_address(QHostAddress::LocalHost), m_socket(this), m_mutex(QMutex::Recursive), - m_sampleRate(0) + m_sampleRate(0), + m_udpReadBytes(0) { - std::fill(m_dataBlocks, m_dataBlocks+4, (RemoteDataBlock *) 0); + m_udpBuf = new char[RemoteUdpSize]; + + std::fill(m_dataFrames, m_dataFrames+4, (RemoteDataFrame *) 0); connect(&m_inputMessageQueue, SIGNAL(messageEnqueued()), this, SLOT(handleInputMessages()), Qt::QueuedConnection); connect(&m_socket, SIGNAL(readyRead()),this, SLOT(recv())); #if QT_VERSION < QT_VERSION_CHECK(5, 15, 0) @@ -106,87 +109,87 @@ void RemoteSourceWorker::handleInputMessages() QMutexLocker mutexLocker(&m_mutex); MsgDataBind* notif = (MsgDataBind*) message; qDebug("RemoteSourceWorker::handleInputMessages: MsgDataBind: %s:%d", qPrintable(notif->getAddress().toString()), notif->getPort()); - disconnect(&m_socket, SIGNAL(readyRead()), this, SLOT(readPendingDatagrams())); + disconnect(&m_socket, SIGNAL(readyRead()), this, SLOT(dataReadyRead())); m_socket.abort(); m_socket.bind(notif->getAddress(), notif->getPort()); - connect(&m_socket, SIGNAL(readyRead()), this, SLOT(readPendingDatagrams())); + connect(&m_socket, SIGNAL(readyRead()), this, SLOT(dataReadyRead())); } } } -void RemoteSourceWorker::readPendingDatagrams() +void RemoteSourceWorker::dataReadyRead() { - RemoteSuperBlock superBlock; - qint64 size; + m_udpReadBytes = 0; - while (m_socket.hasPendingDatagrams()) - { + while (m_socket.hasPendingDatagrams()) + { + qint64 pendingDataSize = m_socket.pendingDatagramSize(); QHostAddress sender; - quint16 senderPort = 0; - //qint64 pendingDataSize = m_socket->pendingDatagramSize(); - size = m_socket.readDatagram((char *) &superBlock, (long long int) sizeof(RemoteSuperBlock), &sender, &senderPort); + m_udpReadBytes += m_socket.readDatagram(&m_udpBuf[m_udpReadBytes], pendingDataSize, &sender, nullptr); - if (size == sizeof(RemoteSuperBlock)) + if (m_udpReadBytes == RemoteUdpSize) { - unsigned int dataBlockIndex = superBlock.m_header.m_frameIndex % m_nbDataBlocks; - int blockIndex = superBlock.m_header.m_blockIndex; + processData(); + m_udpReadBytes = 0; + } + } +} - if (blockIndex == 0) // first block with meta data - { - const RemoteMetaDataFEC *metaData = (const RemoteMetaDataFEC *) &superBlock.m_protectedBlock; - uint32_t sampleRate = metaData->m_sampleRate; +void RemoteSourceWorker::processData() +{ + RemoteSuperBlock *superBlock = (RemoteSuperBlock *) m_udpBuf; + unsigned int dataBlockIndex = superBlock->m_header.m_frameIndex % m_nbDataFrames; + int blockIndex = superBlock->m_header.m_blockIndex; - if (m_sampleRate != sampleRate) - { - qDebug("RemoteSourceWorker::readPendingDatagrams: sampleRate: %u", sampleRate); - m_socket.setSocketOption(QAbstractSocket::ReceiveBufferSizeSocketOption, getDataSocketBufferSize(sampleRate)); - m_sampleRate = sampleRate; - } - } + if (blockIndex == 0) // first block with meta data + { + const RemoteMetaDataFEC *metaData = (const RemoteMetaDataFEC *) &superBlock->m_protectedBlock; + uint32_t sampleRate = metaData->m_sampleRate; - // create the first block for this index - if (m_dataBlocks[dataBlockIndex] == 0) { - m_dataBlocks[dataBlockIndex] = new RemoteDataBlock(); - } - - if (m_dataBlocks[dataBlockIndex]->m_rxControlBlock.m_frameIndex < 0) - { - // initialize virgin block with the frame index - m_dataBlocks[dataBlockIndex]->m_rxControlBlock.m_frameIndex = superBlock.m_header.m_frameIndex; - } - else - { - // if the frame index is not the same for the same slot it means we are starting a new frame - uint32_t frameIndex = m_dataBlocks[dataBlockIndex]->m_rxControlBlock.m_frameIndex; - - if (superBlock.m_header.m_frameIndex != frameIndex) - { - //qDebug("RemoteSourceWorker::readPendingDatagrams: push frame %u", frameIndex); - m_dataQueue->push(m_dataBlocks[dataBlockIndex]); - m_dataBlocks[dataBlockIndex] = new RemoteDataBlock(); - m_dataBlocks[dataBlockIndex]->m_rxControlBlock.m_frameIndex = superBlock.m_header.m_frameIndex; - } - } - - m_dataBlocks[dataBlockIndex]->m_superBlocks[superBlock.m_header.m_blockIndex] = superBlock; - - if (superBlock.m_header.m_blockIndex == 0) { - m_dataBlocks[dataBlockIndex]->m_rxControlBlock.m_metaRetrieved = true; - } - - if (superBlock.m_header.m_blockIndex < RemoteNbOrginalBlocks) { - m_dataBlocks[dataBlockIndex]->m_rxControlBlock.m_originalCount++; - } else { - m_dataBlocks[dataBlockIndex]->m_rxControlBlock.m_recoveryCount++; - } - - m_dataBlocks[dataBlockIndex]->m_rxControlBlock.m_blockCount++; - } - else + if (m_sampleRate != sampleRate) { - qWarning("RemoteSourceWorker::readPendingDatagrams: wrong super block size not processing"); + qDebug("RemoteSourceWorker::processData: sampleRate: %u", sampleRate); + m_socket.setSocketOption(QAbstractSocket::ReceiveBufferSizeSocketOption, getDataSocketBufferSize(sampleRate)); + m_sampleRate = sampleRate; } } + + // create the first block for this index + if (m_dataFrames[dataBlockIndex] == nullptr) { + m_dataFrames[dataBlockIndex] = new RemoteDataFrame(); + } + + if (m_dataFrames[dataBlockIndex]->m_rxControlBlock.m_frameIndex < 0) + { + // initialize virgin block with the frame index + m_dataFrames[dataBlockIndex]->m_rxControlBlock.m_frameIndex = superBlock->m_header.m_frameIndex; + } + else + { + // if the frame index is not the same for the same slot it means we are starting a new frame + uint32_t frameIndex = m_dataFrames[dataBlockIndex]->m_rxControlBlock.m_frameIndex; + + if (superBlock->m_header.m_frameIndex != frameIndex) + { + m_dataQueue->push(m_dataFrames[dataBlockIndex]); + m_dataFrames[dataBlockIndex] = new RemoteDataFrame(); + m_dataFrames[dataBlockIndex]->m_rxControlBlock.m_frameIndex = superBlock->m_header.m_frameIndex; + } + } + + m_dataFrames[dataBlockIndex]->m_superBlocks[superBlock->m_header.m_blockIndex] = *superBlock; + + if (superBlock->m_header.m_blockIndex == 0) { + m_dataFrames[dataBlockIndex]->m_rxControlBlock.m_metaRetrieved = true; + } + + if (superBlock->m_header.m_blockIndex < RemoteNbOrginalBlocks) { + m_dataFrames[dataBlockIndex]->m_rxControlBlock.m_originalCount++; + } else { + m_dataFrames[dataBlockIndex]->m_rxControlBlock.m_recoveryCount++; + } + + m_dataFrames[dataBlockIndex]->m_rxControlBlock.m_blockCount++; } int RemoteSourceWorker::getDataSocketBufferSize(uint32_t inSampleRate) diff --git a/plugins/channeltx/remotesource/remotesourceworker.h b/plugins/channeltx/remotesource/remotesourceworker.h index a5ee5375d..5784413e2 100644 --- a/plugins/channeltx/remotesource/remotesourceworker.h +++ b/plugins/channeltx/remotesource/remotesourceworker.h @@ -26,7 +26,7 @@ #include "util/messagequeue.h" class RemoteDataQueue; -class RemoteDataBlock; +class RemoteDataFrame; class RemoteSourceWorker : public QObject { Q_OBJECT @@ -71,18 +71,22 @@ private: QUdpSocket m_socket; QMutex m_mutex; - static const uint32_t m_nbDataBlocks = 4; //!< number of data blocks in the ring buffer - RemoteDataBlock *m_dataBlocks[m_nbDataBlocks]; //!< ring buffer of data blocks indexed by frame affinity + static const uint32_t m_nbDataFrames = 4; //!< number of data frames in the ring buffer + RemoteDataFrame *m_dataFrames[m_nbDataFrames]; //!< ring buffer of data frames indexed by frame affinity uint32_t m_sampleRate; //!< current sample rate from meta data + qint64 m_udpReadBytes; + char *m_udpBuf; + static int getDataSocketBufferSize(uint32_t inSampleRate); + void processData(); private slots: void started(); void finished(); void errorOccurred(QAbstractSocket::SocketError socketError); void handleInputMessages(); - void readPendingDatagrams(); + void dataReadyRead(); }; diff --git a/plugins/samplesink/remoteoutput/remoteoutput.cpp b/plugins/samplesink/remoteoutput/remoteoutput.cpp index 14b621c3f..a8d28cf48 100644 --- a/plugins/samplesink/remoteoutput/remoteoutput.cpp +++ b/plugins/samplesink/remoteoutput/remoteoutput.cpp @@ -200,14 +200,11 @@ bool RemoteOutput::handleMessage(const Message& message) MsgConfigureRemoteOutputWork& conf = (MsgConfigureRemoteOutputWork&) message; bool working = conf.isWorking(); - if (m_remoteOutputWorker != 0) + if (m_remoteOutputWorker != nullptr) { - if (working) - { + if (working) { m_remoteOutputWorker->startWork(); - } - else - { + } else { m_remoteOutputWorker->stopWork(); } } @@ -221,8 +218,7 @@ bool RemoteOutput::handleMessage(const Message& message) if (cmd.getStartStop()) { - if (m_deviceAPI->initDeviceEngine()) - { + if (m_deviceAPI->initDeviceEngine()) { m_deviceAPI->startDeviceEngine(); } } @@ -241,8 +237,7 @@ bool RemoteOutput::handleMessage(const Message& message) { MsgConfigureRemoteOutputChunkCorrection& conf = (MsgConfigureRemoteOutputChunkCorrection&) message; - if (m_remoteOutputWorker != 0) - { + if (m_remoteOutputWorker != nullptr) { m_remoteOutputWorker->setChunkCorrection(conf.getChunkCorrection()); } @@ -472,9 +467,8 @@ void RemoteOutput::webapiFormatDeviceSettings(SWGSDRangel::SWGDeviceSettings& re void RemoteOutput::webapiFormatDeviceReport(SWGSDRangel::SWGDeviceReport& response) { - uint64_t ts_usecs; response.getRemoteOutputReport()->setBufferRwBalance(m_sampleSourceFifo.getRWBalance()); - response.getRemoteOutputReport()->setSampleCount(m_remoteOutputWorker ? (int) m_remoteOutputWorker->getSamplesCount(ts_usecs) : 0); + response.getRemoteOutputReport()->setSampleCount(m_remoteOutputWorker ? (int) m_remoteOutputWorker->getSamplesCount() : 0); } void RemoteOutput::tick() @@ -615,7 +609,8 @@ void RemoteOutput::sampleRateCorrection(double remoteTimeDeltaUs, double timeDel double chunkCorr = 50000 * deltaSR; // for 50ms chunk intervals (50000us) m_chunkSizeCorrection += roundf(chunkCorr); - qDebug("RemoteOutput::sampleRateCorrection: %d (%f) samples", m_chunkSizeCorrection, chunkCorr); + qDebug("RemoteOutput::sampleRateCorrection: remote: %u / %f us local: %u / %f us corr: %d (%f) samples", + remoteSampleCount, remoteTimeDeltaUs, sampleCount, timeDeltaUs, m_chunkSizeCorrection, chunkCorr); MsgConfigureRemoteOutputChunkCorrection* message = MsgConfigureRemoteOutputChunkCorrection::create(m_chunkSizeCorrection); getInputMessageQueue()->push(message); diff --git a/plugins/samplesink/remoteoutput/remoteoutputfifo.cpp b/plugins/samplesink/remoteoutput/remoteoutputfifo.cpp index 8beee691e..9365cc183 100644 --- a/plugins/samplesink/remoteoutput/remoteoutputfifo.cpp +++ b/plugins/samplesink/remoteoutput/remoteoutputfifo.cpp @@ -47,7 +47,7 @@ void RemoteOutputFifo::reset() m_writeHead = 0; } -RemoteDataBlock *RemoteOutputFifo::getDataBlock() +RemoteDataFrame *RemoteOutputFifo::getDataFrame() { QMutexLocker mutexLocker(&m_mutex); m_servedHead = m_writeHead; @@ -62,18 +62,18 @@ RemoteDataBlock *RemoteOutputFifo::getDataBlock() return &m_data[m_servedHead]; } -unsigned int RemoteOutputFifo::readDataBlock(RemoteDataBlock **dataBlock) +unsigned int RemoteOutputFifo::readDataFrame(RemoteDataFrame **dataFrame) { QMutexLocker mutexLocker(&m_mutex); if (calculateRemainder() == 0) { - *dataBlock = nullptr; + *dataFrame = nullptr; return 0; } else { - *dataBlock = &m_data[m_readHead]; + *dataFrame = &m_data[m_readHead]; m_readHead = m_readHead < m_size - 1 ? m_readHead + 1 : 0; return calculateRemainder(); } @@ -92,4 +92,4 @@ unsigned int RemoteOutputFifo::calculateRemainder() } else { return m_size - (m_readHead - m_servedHead); } -} \ No newline at end of file +} diff --git a/plugins/samplesink/remoteoutput/remoteoutputfifo.h b/plugins/samplesink/remoteoutput/remoteoutputfifo.h index aa870c286..532cb3be0 100644 --- a/plugins/samplesink/remoteoutput/remoteoutputfifo.h +++ b/plugins/samplesink/remoteoutput/remoteoutputfifo.h @@ -34,15 +34,15 @@ public: void resize(unsigned int size); void reset(); - RemoteDataBlock *getDataBlock(); - unsigned int readDataBlock(RemoteDataBlock **dataBlock); + RemoteDataFrame *getDataFrame(); + unsigned int readDataFrame(RemoteDataFrame **dataFrame); unsigned int getRemainder(); signals: void dataBlockServed(); private: - std::vector<RemoteDataBlock> m_data; + std::vector<RemoteDataFrame> m_data; int m_size; int m_readHead; //!< index of last data block processed int m_servedHead; //!< index of last data block served diff --git a/plugins/samplesink/remoteoutput/remoteoutputgui.cpp b/plugins/samplesink/remoteoutput/remoteoutputgui.cpp index 5dd2acf38..7c4f76376 100644 --- a/plugins/samplesink/remoteoutput/remoteoutputgui.cpp +++ b/plugins/samplesink/remoteoutput/remoteoutputgui.cpp @@ -73,9 +73,6 @@ RemoteOutputSinkGui::RemoteOutputSinkGui(DeviceUISet *deviceUISet, QWidget* pare ui->setupUi(this); - ui->centerFrequency->setColorMapper(ColorMapper(ColorMapper::GrayGold)); - ui->centerFrequency->setValueRange(7, 0, pow(10,7)); - ui->sampleRate->setColorMapper(ColorMapper(ColorMapper::GrayGreenYellow)); ui->sampleRate->setValueRange(7, 32000U, 9000000U); @@ -214,7 +211,7 @@ void RemoteOutputSinkGui::updateSampleRate() void RemoteOutputSinkGui::displaySettings() { blockApplySettings(true); - ui->centerFrequency->setValue(m_deviceCenterFrequency / 1000); + ui->centerFrequency->setText(QString("%L1").arg(m_deviceCenterFrequency)); ui->sampleRate->setValue(m_settings.m_sampleRate); ui->nbFECBlocks->setValue(m_settings.m_nbFECBlocks); @@ -530,7 +527,7 @@ void RemoteOutputSinkGui::analyzeApiReply(const QJsonObject& jsonObject) QJsonObject report = jsonObject["RemoteSourceReport"].toObject(); m_deviceCenterFrequency = report["deviceCenterFreq"].toInt() * 1000; m_deviceUISet->getSpectrum()->setCenterFrequency(m_deviceCenterFrequency); - ui->centerFrequency->setValue(m_deviceCenterFrequency/1000); + ui->centerFrequency->setText(QString("%L1").arg(m_deviceCenterFrequency)); int remoteRate = report["deviceSampleRate"].toInt(); ui->remoteRateText->setText(tr("%1k").arg((float)(remoteRate) / 1000)); int queueSize = report["queueSize"].toInt(); diff --git a/plugins/samplesink/remoteoutput/remoteoutputgui.ui b/plugins/samplesink/remoteoutput/remoteoutputgui.ui index 05b451bd2..30ad1d696 100644 --- a/plugins/samplesink/remoteoutput/remoteoutputgui.ui +++ b/plugins/samplesink/remoteoutput/remoteoutputgui.ui @@ -115,36 +115,24 @@ </spacer> </item> <item> - <widget class="ValueDial" name="centerFrequency" native="true"> - <property name="enabled"> - <bool>false</bool> - </property> - <property name="sizePolicy"> - <sizepolicy hsizetype="Maximum" vsizetype="Maximum"> - <horstretch>0</horstretch> - <verstretch>0</verstretch> - </sizepolicy> - </property> + <widget class="QLabel" name="centerFrequency"> <property name="minimumSize"> <size> - <width>32</width> - <height>16</height> + <width>140</width> + <height>0</height> </size> </property> <property name="font"> <font> - <family>Liberation Mono</family> - <pointsize>20</pointsize> + <family>Liberation Sans</family> + <pointsize>14</pointsize> </font> </property> - <property name="cursor"> - <cursorShape>PointingHandCursor</cursorShape> + <property name="text"> + <string>10,000,000,000</string> </property> - <property name="focusPolicy"> - <enum>Qt::StrongFocus</enum> - </property> - <property name="toolTip"> - <string>Record center frequency in kHz</string> + <property name="alignment"> + <set>Qt::AlignRight|Qt::AlignTrailing|Qt::AlignVCenter</set> </property> </widget> </item> @@ -155,7 +143,7 @@ <item> <widget class="QLabel" name="freqUnits"> <property name="text"> - <string> kHz</string> + <string>Hz</string> </property> </widget> </item> diff --git a/plugins/samplesink/remoteoutput/remoteoutputsender.cpp b/plugins/samplesink/remoteoutput/remoteoutputsender.cpp index c68834775..1c7ba8420 100644 --- a/plugins/samplesink/remoteoutput/remoteoutputsender.cpp +++ b/plugins/samplesink/remoteoutput/remoteoutputsender.cpp @@ -63,43 +63,43 @@ void RemoteOutputSender::setDestination(const QString& address, uint16_t port) m_remoteHostAddress.setAddress(address); } -RemoteDataBlock *RemoteOutputSender::getDataBlock() +RemoteDataFrame *RemoteOutputSender::getDataFrame() { - return m_fifo.getDataBlock(); + return m_fifo.getDataFrame(); } void RemoteOutputSender::handleData() { - RemoteDataBlock *dataBlock; + RemoteDataFrame *dataFrame; unsigned int remainder = m_fifo.getRemainder(); while (remainder != 0) { - remainder = m_fifo.readDataBlock(&dataBlock); + remainder = m_fifo.readDataFrame(&dataFrame); - if (dataBlock) { - sendDataBlock(dataBlock); + if (dataFrame) { + sendDataFrame(dataFrame); } } } -void RemoteOutputSender::sendDataBlock(RemoteDataBlock *dataBlock) +void RemoteOutputSender::sendDataFrame(RemoteDataFrame *dataFrame) { CM256::cm256_encoder_params cm256Params; //!< Main interface with CM256 encoder CM256::cm256_block descriptorBlocks[256]; //!< Pointers to data for CM256 encoder RemoteProtectedBlock fecBlocks[256]; //!< FEC data - uint16_t frameIndex = dataBlock->m_txControlBlock.m_frameIndex; - int nbBlocksFEC = dataBlock->m_txControlBlock.m_nbBlocksFEC; - m_remoteHostAddress.setAddress(dataBlock->m_txControlBlock.m_dataAddress); - uint16_t dataPort = dataBlock->m_txControlBlock.m_dataPort; - RemoteSuperBlock *txBlockx = dataBlock->m_superBlocks; + uint16_t frameIndex = dataFrame->m_txControlBlock.m_frameIndex; + int nbBlocksFEC = dataFrame->m_txControlBlock.m_nbBlocksFEC; + m_remoteHostAddress.setAddress(dataFrame->m_txControlBlock.m_dataAddress); + uint16_t dataPort = dataFrame->m_txControlBlock.m_dataPort; + RemoteSuperBlock *txBlockx = dataFrame->m_superBlocks; if ((nbBlocksFEC == 0) || !m_cm256p) // Do not FEC encode { if (m_udpSocket) { - for (int i = 0; i < RemoteNbOrginalBlocks; i++) { // send block via UDP + for (int i = 0; i < RemoteNbOrginalBlocks; i++) { // send blocks via UDP m_udpSocket->writeDatagram((const char*)&txBlockx[i], (qint64 ) RemoteUdpSize, m_remoteHostAddress, dataPort); } } @@ -130,7 +130,7 @@ void RemoteOutputSender::sendDataBlock(RemoteDataBlock *dataBlock) { qWarning("RemoteSinkSender::handleDataBlock: CM256 encode failed. Transmit without FEC."); cm256Params.RecoveryCount = 0; - RemoteSuperBlock& superBlock = dataBlock->m_superBlocks[0]; // first block + RemoteSuperBlock& superBlock = dataFrame->m_superBlocks[0]; // first block RemoteMetaDataFEC *destMeta = (RemoteMetaDataFEC *) &superBlock.m_protectedBlock; destMeta->m_nbFECBlocks = 0; boost::crc_32_type crc32; @@ -146,11 +146,11 @@ void RemoteOutputSender::sendDataBlock(RemoteDataBlock *dataBlock) // Transmit all blocks if (m_udpSocket) { - for (int i = 0; i < cm256Params.OriginalCount + cm256Params.RecoveryCount; i++) { // send block via UDP + for (int i = 0; i < cm256Params.OriginalCount + cm256Params.RecoveryCount; i++) { // send blocks via UDP m_udpSocket->writeDatagram((const char*)&txBlockx[i], (qint64 ) RemoteUdpSize, m_remoteHostAddress, dataPort); } } } - dataBlock->m_txControlBlock.m_processed = true; + dataFrame->m_txControlBlock.m_processed = true; } diff --git a/plugins/samplesink/remoteoutput/remoteoutputsender.h b/plugins/samplesink/remoteoutput/remoteoutputsender.h index a7ec2e369..0315a6b95 100644 --- a/plugins/samplesink/remoteoutput/remoteoutputsender.h +++ b/plugins/samplesink/remoteoutput/remoteoutputsender.h @@ -36,7 +36,7 @@ #include "remoteoutputfifo.h" -class RemoteDataBlock; +class RemoteDataFrame; class CM256; class QUdpSocket; @@ -47,7 +47,7 @@ public: RemoteOutputSender(); ~RemoteOutputSender(); - RemoteDataBlock *getDataBlock(); + RemoteDataFrame *getDataFrame(); void setDestination(const QString& address, uint16_t port); private: @@ -61,7 +61,7 @@ private: uint16_t m_remotePort; QHostAddress m_remoteHostAddress; - void sendDataBlock(RemoteDataBlock *dataBlock); + void sendDataFrame(RemoteDataFrame *dataFrame); private slots: void handleData(); diff --git a/plugins/samplesink/remoteoutput/remoteoutputworker.cpp b/plugins/samplesink/remoteoutput/remoteoutputworker.cpp index e295bf7d4..7c54fee62 100644 --- a/plugins/samplesink/remoteoutput/remoteoutputworker.cpp +++ b/plugins/samplesink/remoteoutput/remoteoutputworker.cpp @@ -120,6 +120,7 @@ void RemoteOutputWorker::tick() SampleVector& data = m_sampleFifo->getData(); unsigned int iPart1Begin, iPart1End, iPart2Begin, iPart2End; m_sampleFifo->read(m_samplesChunkSize, iPart1Begin, iPart1End, iPart2Begin, iPart2End); + m_samplesCount += m_samplesChunkSize; if (iPart1Begin != iPart1End) { diff --git a/plugins/samplesink/remoteoutput/remoteoutputworker.h b/plugins/samplesink/remoteoutput/remoteoutputworker.h index 36a993b53..8a20ab52b 100644 --- a/plugins/samplesink/remoteoutput/remoteoutputworker.h +++ b/plugins/samplesink/remoteoutput/remoteoutputworker.h @@ -53,8 +53,8 @@ public: bool isRunning() const { return m_running; } + uint32_t getSamplesCount() const { return m_samplesCount; } uint32_t getSamplesCount(uint64_t& ts_usecs) const; - void setSamplesCount(int samplesCount) { m_samplesCount = samplesCount; } void setChunkCorrection(int chunkCorrection) { m_chunkCorrection = chunkCorrection; } void connectTimer(const QTimer& timer); diff --git a/plugins/samplesink/remoteoutput/udpsinkfec.cpp b/plugins/samplesink/remoteoutput/udpsinkfec.cpp index fac7a8dd2..1aaf3629a 100644 --- a/plugins/samplesink/remoteoutput/udpsinkfec.cpp +++ b/plugins/samplesink/remoteoutput/udpsinkfec.cpp @@ -32,7 +32,7 @@ UDPSinkFEC::UDPSinkFEC() : m_nbSamples(0), m_nbBlocksFEC(0), m_txDelayRatio(0.0), - m_dataBlock(nullptr), + m_dataFrame(nullptr), m_txBlockIndex(0), m_txBlocksIndex(0), m_frameCount(0), @@ -115,14 +115,14 @@ void UDPSinkFEC::write(const SampleVector::iterator& begin, uint32_t sampleChunk metaData.m_tv_sec = nowus / 1000000UL; // tv.tv_sec; metaData.m_tv_usec = nowus % 1000000UL; // tv.tv_usec; - if (!m_dataBlock) { // on the very first cycle there is no data block allocated - m_dataBlock = m_remoteOutputSender->getDataBlock(); // ask a new block to sender + if (!m_dataFrame) { // on the very first cycle there is no data block allocated + m_dataFrame = m_remoteOutputSender->getDataFrame(); // ask a new block to sender } boost::crc_32_type crc32; crc32.process_bytes(&metaData, sizeof(RemoteMetaDataFEC)-4); metaData.m_crc32 = crc32.checksum(); - RemoteSuperBlock& superBlock = m_dataBlock->m_superBlocks[0]; // first block + RemoteSuperBlock& superBlock = m_dataFrame->m_superBlocks[0]; // first block superBlock.init(); superBlock.m_header.m_frameIndex = m_frameCount; superBlock.m_header.m_blockIndex = m_txBlockIndex; @@ -172,18 +172,18 @@ void UDPSinkFEC::write(const SampleVector::iterator& begin, uint32_t sampleChunk m_superBlock.m_header.m_blockIndex = m_txBlockIndex; m_superBlock.m_header.m_sampleBytes = (SDR_RX_SAMP_SZ <= 16 ? 2 : 4); m_superBlock.m_header.m_sampleBits = SDR_RX_SAMP_SZ; - m_dataBlock->m_superBlocks[m_txBlockIndex] = m_superBlock; + m_dataFrame->m_superBlocks[m_txBlockIndex] = m_superBlock; if (m_txBlockIndex == RemoteNbOrginalBlocks - 1) // frame complete { - m_dataBlock->m_txControlBlock.m_frameIndex = m_frameCount; - m_dataBlock->m_txControlBlock.m_processed = false; - m_dataBlock->m_txControlBlock.m_complete = true; - m_dataBlock->m_txControlBlock.m_nbBlocksFEC = m_nbBlocksFEC; - m_dataBlock->m_txControlBlock.m_dataAddress = m_remoteAddress; - m_dataBlock->m_txControlBlock.m_dataPort = m_remotePort; + m_dataFrame->m_txControlBlock.m_frameIndex = m_frameCount; + m_dataFrame->m_txControlBlock.m_processed = false; + m_dataFrame->m_txControlBlock.m_complete = true; + m_dataFrame->m_txControlBlock.m_nbBlocksFEC = m_nbBlocksFEC; + m_dataFrame->m_txControlBlock.m_dataAddress = m_remoteAddress; + m_dataFrame->m_txControlBlock.m_dataPort = m_remotePort; - m_dataBlock = m_remoteOutputSender->getDataBlock(); // ask a new block to sender + m_dataFrame = m_remoteOutputSender->getDataFrame(); // ask a new block to sender m_txBlockIndex = 0; m_frameCount++; diff --git a/plugins/samplesink/remoteoutput/udpsinkfec.h b/plugins/samplesink/remoteoutput/udpsinkfec.h index 04a4e1455..28fddbd31 100644 --- a/plugins/samplesink/remoteoutput/udpsinkfec.h +++ b/plugins/samplesink/remoteoutput/udpsinkfec.h @@ -86,7 +86,7 @@ private: RemoteMetaDataFEC m_currentMetaFEC; //!< Meta data for current frame uint32_t m_nbBlocksFEC; //!< Variable number of FEC blocks float m_txDelayRatio; //!< Delay in ratio of nominal frame period - RemoteDataBlock *m_dataBlock; + RemoteDataFrame *m_dataFrame; RemoteSuperBlock m_superBlock; //!< current super block being built int m_txBlockIndex; //!< Current index in blocks to transmit in the Tx row int m_txBlocksIndex; //!< Current index of Tx blocks row diff --git a/sdrbase/channel/remotedatablock.h b/sdrbase/channel/remotedatablock.h index 09c6973fa..e1b094baf 100644 --- a/sdrbase/channel/remotedatablock.h +++ b/sdrbase/channel/remotedatablock.h @@ -157,13 +157,13 @@ struct RemoteRxControlBlock } }; -class RemoteDataBlock +class RemoteDataFrame { public: - RemoteDataBlock() { - m_superBlocks = new RemoteSuperBlock[256]; + RemoteDataFrame() { + m_superBlocks = new RemoteSuperBlock[256]; //!< 128 original bloks + 128 possible recovery blocks } - ~RemoteDataBlock() { + ~RemoteDataFrame() { delete[] m_superBlocks; } RemoteTxControlBlock m_txControlBlock; diff --git a/sdrbase/channel/remotedataqueue.cpp b/sdrbase/channel/remotedataqueue.cpp index 96edd3d64..fc334fecf 100644 --- a/sdrbase/channel/remotedataqueue.cpp +++ b/sdrbase/channel/remotedataqueue.cpp @@ -30,14 +30,13 @@ RemoteDataQueue::RemoteDataQueue(QObject* parent) : QObject(parent), m_lock(QMutex::Recursive), - m_queue(), - m_count(0) + m_queue() { } RemoteDataQueue::~RemoteDataQueue() { - RemoteDataBlock* data; + RemoteDataFrame* data; while ((data = pop()) != nullptr) { @@ -46,14 +45,13 @@ RemoteDataQueue::~RemoteDataQueue() } } -void RemoteDataQueue::push(RemoteDataBlock* data, bool emitSignal) +void RemoteDataQueue::push(RemoteDataFrame* data, bool emitSignal) { if (data) { m_lock.lock(); m_queue.enqueue(data); - m_count++; - // qDebug("RemoteDataQueue::push: %d %d", m_count, m_queue.size()); + // qDebug("RemoteDataQueue::push: %d", m_queue.size()); m_lock.unlock(); } @@ -62,14 +60,13 @@ void RemoteDataQueue::push(RemoteDataBlock* data, bool emitSignal) } } -RemoteDataBlock* RemoteDataQueue::pop() +RemoteDataFrame* RemoteDataQueue::pop() { QMutexLocker locker(&m_lock); if (m_queue.isEmpty()) { return nullptr; } else { - m_count--; return m_queue.dequeue(); } } diff --git a/sdrbase/channel/remotedataqueue.h b/sdrbase/channel/remotedataqueue.h index c0d83ee84..51ec1df39 100644 --- a/sdrbase/channel/remotedataqueue.h +++ b/sdrbase/channel/remotedataqueue.h @@ -31,17 +31,17 @@ #include "export.h" -class RemoteDataBlock; +class RemoteDataFrame; class SDRBASE_API RemoteDataQueue : public QObject { Q_OBJECT public: - RemoteDataQueue(QObject* parent = NULL); + RemoteDataQueue(QObject* parent = nullptr); ~RemoteDataQueue(); - void push(RemoteDataBlock* dataBlock, bool emitSignal = true); //!< Push daa block onto queue - RemoteDataBlock* pop(); //!< Pop message from queue + void push(RemoteDataFrame* dataFrame, bool emitSignal = true); //!< Push data frame onto queue + RemoteDataFrame* pop(); //!< Pop frame from queue int size(); //!< Returns queue size void clear(); //!< Empty queue @@ -51,8 +51,7 @@ signals: private: QMutex m_lock; - QQueue<RemoteDataBlock*> m_queue; - int m_count; + QQueue<RemoteDataFrame*> m_queue; }; #endif /* CHANNEL_REMOTEDATAQUEUE_H_ */ diff --git a/sdrbase/channel/remotedatareadqueue.cpp b/sdrbase/channel/remotedatareadqueue.cpp index 9c827c8e0..8eaf29138 100644 --- a/sdrbase/channel/remotedatareadqueue.cpp +++ b/sdrbase/channel/remotedatareadqueue.cpp @@ -28,17 +28,16 @@ const uint32_t RemoteDataReadQueue::MinimumMaxSize = 10; RemoteDataReadQueue::RemoteDataReadQueue() : - m_dataBlock(nullptr), + m_dataFrame(nullptr), m_maxSize(MinimumMaxSize), m_blockIndex(1), m_sampleIndex(0), - m_sampleCount(0), - m_full(false) + m_sampleCount(0) {} RemoteDataReadQueue::~RemoteDataReadQueue() { - RemoteDataBlock* data; + RemoteDataFrame* data; while ((data = pop()) != nullptr) { @@ -47,27 +46,18 @@ RemoteDataReadQueue::~RemoteDataReadQueue() } } -void RemoteDataReadQueue::push(RemoteDataBlock* dataBlock) +void RemoteDataReadQueue::push(RemoteDataFrame* dataFrame) { - if (length() >= m_maxSize) - { + if (length() < m_maxSize) { + m_dataReadQueue.enqueue(dataFrame); + } else { qWarning("RemoteDataReadQueue::push: queue is full"); - m_full = true; // stop filling the queue - RemoteDataBlock *data = m_dataReadQueue.takeLast(); - delete data; - } - - if (m_full) { - m_full = (length() > m_maxSize/10); // do not fill queue again before queue is half size - } - - if (!m_full) { - m_dataReadQueue.enqueue(dataBlock); } } -RemoteDataBlock* RemoteDataReadQueue::pop() +RemoteDataFrame* RemoteDataReadQueue::pop() { + if (m_dataReadQueue.isEmpty()) { return nullptr; @@ -76,7 +66,6 @@ RemoteDataBlock* RemoteDataReadQueue::pop() { m_blockIndex = 1; m_sampleIndex = 0; - return m_dataReadQueue.dequeue(); } } @@ -91,13 +80,15 @@ void RemoteDataReadQueue::setSize(uint32_t size) void RemoteDataReadQueue::readSample(Sample& s, bool scaleForTx) { // depletion/repletion state - if (m_dataBlock == nullptr) + if (m_dataFrame == nullptr) { - if (length() >= m_maxSize/10) + m_dataFrame = pop(); + + if (m_dataFrame) { - qDebug("RemoteDataReadQueue::readSample: initial pop new block: queue size: %u", length()); + qDebug("RemoteDataReadQueue::readSample: initial pop new frame: queue size: %u", length()); m_blockIndex = 1; - m_dataBlock = m_dataReadQueue.dequeue(); + m_sampleIndex = 0; convertDataToSample(s, m_blockIndex, m_sampleIndex, scaleForTx); m_sampleIndex++; m_sampleCount++; @@ -110,7 +101,7 @@ void RemoteDataReadQueue::readSample(Sample& s, bool scaleForTx) return; } - int sampleSize = m_dataBlock->m_superBlocks[m_blockIndex].m_header.m_sampleBytes * 2; + int sampleSize = m_dataFrame->m_superBlocks[m_blockIndex].m_header.m_sampleBytes * 2; uint32_t samplesPerBlock = RemoteNbBytesPerBlock / sampleSize; if (m_sampleIndex < samplesPerBlock) @@ -132,28 +123,24 @@ void RemoteDataReadQueue::readSample(Sample& s, bool scaleForTx) } else { - delete m_dataBlock; - m_dataBlock = nullptr; + delete m_dataFrame; + m_dataFrame = nullptr; - if (length() == 0) { - qWarning("RemoteDataReadQueue::readSample: try to pop new block but queue is empty"); - } + m_dataFrame = pop(); - if (length() > 0) + if (m_dataFrame) { m_blockIndex = 1; - m_dataBlock = m_dataReadQueue.dequeue(); + m_sampleIndex = 0; convertDataToSample(s, m_blockIndex, m_sampleIndex, scaleForTx); m_sampleIndex++; m_sampleCount++; } else { + qWarning("RemoteDataReadQueue::readSample: try to pop new block but queue is empty"); s = Sample{0, 0}; } } } } - - - diff --git a/sdrbase/channel/remotedatareadqueue.h b/sdrbase/channel/remotedatareadqueue.h index 8ecf60d68..1f62ad48f 100644 --- a/sdrbase/channel/remotedatareadqueue.h +++ b/sdrbase/channel/remotedatareadqueue.h @@ -29,7 +29,7 @@ #include "export.h" -class RemoteDataBlock; +class RemoteDataFrame; struct Sample; class SDRBASE_API RemoteDataReadQueue @@ -38,7 +38,7 @@ public: RemoteDataReadQueue(); ~RemoteDataReadQueue(); - void push(RemoteDataBlock* dataBlock); //!< push block on the queue + void push(RemoteDataFrame* dataFrame); //!< push frame on the queue void readSample(Sample& s, bool scaleForTx = false); //!< Read sample from queue possibly scaling to Tx size uint32_t length() const { return m_dataReadQueue.size(); } //!< Returns queue length uint32_t size() const { return m_maxSize; } //!< Returns queue size (max length) @@ -48,26 +48,25 @@ public: static const uint32_t MinimumMaxSize; private: - QQueue<RemoteDataBlock*> m_dataReadQueue; - RemoteDataBlock *m_dataBlock; + QQueue<RemoteDataFrame*> m_dataReadQueue; + RemoteDataFrame *m_dataFrame; uint32_t m_maxSize; uint32_t m_blockIndex; uint32_t m_sampleIndex; uint32_t m_sampleCount; //!< use a counter capped below 2^31 as it is going to be converted to an int in the web interface - bool m_full; //!< full condition was hit - RemoteDataBlock* pop(); //!< Pop block from the queue + RemoteDataFrame* pop(); //!< Pop frame from the queue inline void convertDataToSample(Sample& s, uint32_t blockIndex, uint32_t sampleIndex, bool scaleForTx) { - int sampleSize = m_dataBlock->m_superBlocks[blockIndex].m_header.m_sampleBytes * 2; // I/Q sample size in data block - int samplebits = m_dataBlock->m_superBlocks[blockIndex].m_header.m_sampleBits; // I or Q sample size in bits + int sampleSize = m_dataFrame->m_superBlocks[blockIndex].m_header.m_sampleBytes * 2; // I/Q sample size in data block + int samplebits = m_dataFrame->m_superBlocks[blockIndex].m_header.m_sampleBits; // I or Q sample size in bits int32_t iconv, qconv; if ((sizeof(Sample) == 4) && (sampleSize == 8)) // generally 24->16 bits { - iconv = ((int32_t*) &(m_dataBlock->m_superBlocks[blockIndex].m_protectedBlock.buf[sampleIndex*sampleSize]))[0]; - qconv = ((int32_t*) &(m_dataBlock->m_superBlocks[blockIndex].m_protectedBlock.buf[sampleIndex*sampleSize+4]))[0]; + iconv = ((int32_t*) &(m_dataFrame->m_superBlocks[blockIndex].m_protectedBlock.buf[sampleIndex*sampleSize]))[0]; + qconv = ((int32_t*) &(m_dataFrame->m_superBlocks[blockIndex].m_protectedBlock.buf[sampleIndex*sampleSize+4]))[0]; iconv >>= scaleForTx ? (SDR_TX_SAMP_SZ-SDR_RX_SAMP_SZ) : (samplebits-SDR_RX_SAMP_SZ); qconv >>= scaleForTx ? (SDR_TX_SAMP_SZ-SDR_RX_SAMP_SZ) : (samplebits-SDR_RX_SAMP_SZ); s.setReal(iconv); @@ -75,8 +74,8 @@ private: } else if ((sizeof(Sample) == 8) && (sampleSize == 4)) // generally 16->24 bits { - iconv = ((int16_t*) &(m_dataBlock->m_superBlocks[blockIndex].m_protectedBlock.buf[sampleIndex*sampleSize]))[0]; - qconv = ((int16_t*) &(m_dataBlock->m_superBlocks[blockIndex].m_protectedBlock.buf[sampleIndex*sampleSize+2]))[0]; + iconv = ((int16_t*) &(m_dataFrame->m_superBlocks[blockIndex].m_protectedBlock.buf[sampleIndex*sampleSize]))[0]; + qconv = ((int16_t*) &(m_dataFrame->m_superBlocks[blockIndex].m_protectedBlock.buf[sampleIndex*sampleSize+2]))[0]; iconv <<= scaleForTx ? (SDR_TX_SAMP_SZ-samplebits) : (SDR_RX_SAMP_SZ-samplebits); qconv <<= scaleForTx ? (SDR_TX_SAMP_SZ-samplebits) : (SDR_RX_SAMP_SZ-samplebits); s.setReal(iconv); @@ -84,7 +83,7 @@ private: } else if ((sampleSize == 4) || (sampleSize == 8)) // generally 16->16 or 24->24 bits { - s = *((Sample*) &(m_dataBlock->m_superBlocks[blockIndex].m_protectedBlock.buf[sampleIndex*sampleSize])); + s = *((Sample*) &(m_dataFrame->m_superBlocks[blockIndex].m_protectedBlock.buf[sampleIndex*sampleSize])); } else // invalid size {