/////////////////////////////////////////////////////////////////////////////////// // Copyright (C) 2018-2019 Edouard Griffiths, F4EXB // // // // This program is free software; you can redistribute it and/or modify // // it under the terms of the GNU General Public License as published by // // the Free Software Foundation as version 3 of the License, or // // (at your option) any later version. // // // // This program is distributed in the hope that it will be useful, // // but WITHOUT ANY WARRANTY; without even the implied warranty of // // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // // GNU General Public License V3 for more details. // // // // You should have received a copy of the GNU General Public License // // along with this program. If not, see . // /////////////////////////////////////////////////////////////////////////////////// #include #include #include #include #include "cm256cc/cm256.h" #include "remotesourceworker.h" MESSAGE_CLASS_DEFINITION(RemoteSourceWorker::MsgDataBind, Message) RemoteSourceWorker::RemoteSourceWorker(RemoteDataQueue *dataQueue, QObject* parent) : QObject(parent), m_running(false), m_dataQueue(dataQueue), m_address(QHostAddress::LocalHost), m_socket(nullptr), m_sampleRate(0) { std::fill(m_dataBlocks, m_dataBlocks+4, (RemoteDataBlock *) 0); connect(&m_inputMessageQueue, SIGNAL(messageEnqueued()), this, SLOT(handleInputMessages()), Qt::QueuedConnection); } RemoteSourceWorker::~RemoteSourceWorker() { qDebug("RemoteSourceWorker::~RemoteSourceWorker"); } void RemoteSourceWorker::dataBind(const QString& address, uint16_t port) { MsgDataBind *msg = MsgDataBind::create(address, port); m_inputMessageQueue.push(msg); } void RemoteSourceWorker::startWork() { qDebug("RemoteSourceWorker::startWork"); m_socket = new QUdpSocket(this); m_socket->setSocketOption(QAbstractSocket::ReceiveBufferSizeSocketOption, getDataSocketBufferSize(m_sampleRate)); m_running = false; } void RemoteSourceWorker::stopWork() { qDebug("RemoteSourceWorker::stopWork"); delete m_socket; m_socket = nullptr; m_running = false; } void RemoteSourceWorker::handleInputMessages() { Message* message; while ((message = m_inputMessageQueue.pop()) != 0) { if (MsgDataBind::match(*message)) { MsgDataBind* notif = (MsgDataBind*) message; qDebug("RemoteSourceWorker::handleInputMessages: MsgDataBind: %s:%d", qPrintable(notif->getAddress().toString()), notif->getPort()); if (m_socket) { disconnect(m_socket, SIGNAL(readyRead()), this, SLOT(readPendingDatagrams())); m_socket->bind(notif->getAddress(), notif->getPort()); connect(m_socket, SIGNAL(readyRead()), this, SLOT(readPendingDatagrams())); } } } } void RemoteSourceWorker::readPendingDatagrams() { RemoteSuperBlock superBlock; qint64 size; while (m_socket->hasPendingDatagrams()) { QHostAddress sender; quint16 senderPort = 0; //qint64 pendingDataSize = m_socket->pendingDatagramSize(); size = m_socket->readDatagram((char *) &superBlock, (long long int) sizeof(RemoteSuperBlock), &sender, &senderPort); if (size == sizeof(RemoteSuperBlock)) { unsigned int dataBlockIndex = superBlock.m_header.m_frameIndex % m_nbDataBlocks; int blockIndex = superBlock.m_header.m_blockIndex; if (blockIndex == 0) // first block with meta data { const RemoteMetaDataFEC *metaData = (const RemoteMetaDataFEC *) &superBlock.m_protectedBlock; uint32_t sampleRate = metaData->m_sampleRate; if (m_sampleRate != sampleRate) { qDebug("RemoteSourceWorker::readPendingDatagrams: sampleRate: %u", sampleRate); m_socket->setSocketOption(QAbstractSocket::ReceiveBufferSizeSocketOption, getDataSocketBufferSize(sampleRate)); m_sampleRate = sampleRate; } } // create the first block for this index if (m_dataBlocks[dataBlockIndex] == 0) { m_dataBlocks[dataBlockIndex] = new RemoteDataBlock(); } if (m_dataBlocks[dataBlockIndex]->m_rxControlBlock.m_frameIndex < 0) { // initialize virgin block with the frame index m_dataBlocks[dataBlockIndex]->m_rxControlBlock.m_frameIndex = superBlock.m_header.m_frameIndex; } else { // if the frame index is not the same for the same slot it means we are starting a new frame uint32_t frameIndex = m_dataBlocks[dataBlockIndex]->m_rxControlBlock.m_frameIndex; if (superBlock.m_header.m_frameIndex != frameIndex) { //qDebug("RemoteSourceWorker::readPendingDatagrams: push frame %u", frameIndex); m_dataQueue->push(m_dataBlocks[dataBlockIndex]); m_dataBlocks[dataBlockIndex] = new RemoteDataBlock(); m_dataBlocks[dataBlockIndex]->m_rxControlBlock.m_frameIndex = superBlock.m_header.m_frameIndex; } } m_dataBlocks[dataBlockIndex]->m_superBlocks[superBlock.m_header.m_blockIndex] = superBlock; if (superBlock.m_header.m_blockIndex == 0) { m_dataBlocks[dataBlockIndex]->m_rxControlBlock.m_metaRetrieved = true; } if (superBlock.m_header.m_blockIndex < RemoteNbOrginalBlocks) { m_dataBlocks[dataBlockIndex]->m_rxControlBlock.m_originalCount++; } else { m_dataBlocks[dataBlockIndex]->m_rxControlBlock.m_recoveryCount++; } m_dataBlocks[dataBlockIndex]->m_rxControlBlock.m_blockCount++; } else { qWarning("RemoteSourceWorker::readPendingDatagrams: wrong super block size not processing"); } } } int RemoteSourceWorker::getDataSocketBufferSize(uint32_t inSampleRate) { // set a floor value at 24 kS/s uint32_t samplerate = inSampleRate < 24000 ? 24000 : inSampleRate; // 250 ms (1/4s) at current sample rate int bufferSize = (samplerate * 2 * (SDR_RX_SAMP_SZ == 16 ? 2 : 4)) / 4; qDebug("RemoteSourceWorker::getDataSocketBufferSize: %d bytes", bufferSize); return bufferSize; }