From 70adea206cc6878577f8e1527fc2162d658684f8 Mon Sep 17 00:00:00 2001 From: f4exb Date: Wed, 31 Jan 2018 00:40:54 +0100 Subject: [PATCH] RTP audio sink fixes and test in NFM demod (endianess problem) --- plugins/channelrx/demodnfm/nfmdemod.cpp | 6 + .../channelrx/demodnfm/nfmdemodsettings.cpp | 2 +- sdrbase/CMakeLists.txt | 4 + sdrbase/audio/audionetsink.cpp | 84 +++---- sdrbase/audio/audionetsink.h | 6 +- sdrbase/util/rtpsink.cpp | 220 ++++++++++++++++++ sdrbase/util/rtpsink.h | 110 ++------- 7 files changed, 289 insertions(+), 143 deletions(-) create mode 100644 sdrbase/util/rtpsink.cpp diff --git a/plugins/channelrx/demodnfm/nfmdemod.cpp b/plugins/channelrx/demodnfm/nfmdemod.cpp index 348f7819f..c1d2e1a57 100644 --- a/plugins/channelrx/demodnfm/nfmdemod.cpp +++ b/plugins/channelrx/demodnfm/nfmdemod.cpp @@ -83,6 +83,12 @@ NFMDemod::NFMDemod(DeviceSourceAPI *devieAPI) : m_audioNetSink = new AudioNetSink(this); m_audioNetSink->setDestination(m_settings.m_udpAddress, m_settings.m_udpPort); + if (m_audioNetSink->selectType(AudioNetSink::SinkRTP)) { + qDebug("NFMDemod::NFMDemod: set audio sink to RTP mode"); + } else { + qWarning("NFMDemod::NFMDemod: RTP support for audio sink not available. Fall back too UDP"); + } + m_channelizer = new DownChannelizer(this); m_threadedChannelizer = new ThreadedBasebandSampleSink(m_channelizer, this); m_deviceAPI->addThreadedSink(m_threadedChannelizer); diff --git a/plugins/channelrx/demodnfm/nfmdemodsettings.cpp b/plugins/channelrx/demodnfm/nfmdemodsettings.cpp index 09695ad63..6699524da 100644 --- a/plugins/channelrx/demodnfm/nfmdemodsettings.cpp +++ b/plugins/channelrx/demodnfm/nfmdemodsettings.cpp @@ -52,7 +52,7 @@ void NFMDemodSettings::resetToDefaults() m_audioSampleRate = DSPEngine::instance()->getAudioSampleRate(); m_copyAudioToUDP = false; m_udpAddress = "127.0.0.1"; - m_udpPort = 9999; + m_udpPort = 9998; m_rgbColor = QColor(255, 0, 0).rgb(); m_title = "NFM Demodulator"; } diff --git a/sdrbase/CMakeLists.txt b/sdrbase/CMakeLists.txt index 7fdbddfcd..44bcefa3a 100644 --- a/sdrbase/CMakeLists.txt +++ b/sdrbase/CMakeLists.txt @@ -216,6 +216,10 @@ if (JRTPLIB_FOUND) ${sdrbase_HEADERS} util/rtpsink.h ) + set(sdrbase_SOURCES + ${sdrbase_SOURCES} + util/rtpsink.cpp + ) add_definitions(-DHAS_JRTPLIB) include_directories(${JRTPLIB_INCLUDE_DIR}) endif(JRTPLIB_FOUND) diff --git a/sdrbase/audio/audionetsink.cpp b/sdrbase/audio/audionetsink.cpp index c5331d741..76415347d 100644 --- a/sdrbase/audio/audionetsink.cpp +++ b/sdrbase/audio/audionetsink.cpp @@ -26,33 +26,33 @@ const int AudioNetSink::m_udpBlockSize = 512; AudioNetSink::AudioNetSink(QObject *parent, bool stereo) : m_type(SinkUDP), m_udpBufferAudioMono(0), - m_udpBufferAudioStereo(0) + m_udpBufferAudioStereo(0), + m_rtpBufferAudio(0) { - if (stereo) - { + if (stereo) { m_udpBufferAudioStereo = new UDPSink(parent, m_udpBlockSize); -#ifdef HAS_JRTPLIB - m_rtpBufferAudioStereo = new RTPSink("127.0.0.1", 9999, 48000); - m_rtpBufferAudioMono = 0; -#endif - } - else - { + } else { m_udpBufferAudioMono = new UDPSink(parent, m_udpBlockSize); -#ifdef HAS_JRTPLIB - m_rtpBufferAudioMono = new RTPSink("127.0.0.1", 9999, 48000); - m_rtpBufferAudioStereo = 0; -#endif } + +#ifdef HAS_JRTPLIB + m_rtpBufferAudio = new RTPSink("127.0.0.1", 9999, stereo ? RTPSink::PayloadL16Stereo : RTPSink::PayloadL16Mono); +#endif } AudioNetSink::~AudioNetSink() { - if (m_udpBufferAudioMono) { delete m_udpBufferAudioMono; } - if (m_udpBufferAudioStereo) { delete m_udpBufferAudioStereo; } + if (m_udpBufferAudioMono) { + delete m_udpBufferAudioMono; + } + + if (m_udpBufferAudioStereo) { + delete m_udpBufferAudioStereo; + } #ifdef HAS_JRTPLIB - if (m_rtpBufferAudioMono) { delete m_rtpBufferAudioMono; } - if (m_rtpBufferAudioStereo) { delete m_rtpBufferAudioStereo; } + if (m_rtpBufferAudio) { + delete m_rtpBufferAudio; + } #endif } @@ -87,51 +87,39 @@ void AudioNetSink::setDestination(const QString& address, uint16_t port) if (m_udpBufferAudioStereo) { m_udpBufferAudioStereo->setDestination(address, port); } + #ifdef HAS_JRTPLIB - if (m_rtpBufferAudioMono) { - m_rtpBufferAudioMono->setDestination(address, port); - } - if (m_rtpBufferAudioStereo) { - m_rtpBufferAudioStereo->setDestination(address, port); + if (m_rtpBufferAudio) { + m_rtpBufferAudio->setDestination(address, port); } #endif } +#ifdef HAS_JRTPLIB void AudioNetSink::addDestination(const QString& address, uint16_t port) { -#ifdef HAS_JRTPLIB - if (m_rtpBufferAudioMono) { - m_rtpBufferAudioMono->addDestination(address, port); + if (m_rtpBufferAudio) { + m_rtpBufferAudio->addDestination(address, port); } - if (m_rtpBufferAudioStereo) { - m_rtpBufferAudioStereo->addDestination(address, port); - } -#endif } +#else +void AudioNetSink::addDestination(const QString& address __attribute__((unused)), uint16_t port __attribute__((unused))) +{ +} +#endif +#ifdef HAS_JRTPLIB void AudioNetSink::deleteDestination(const QString& address, uint16_t port) { -#ifdef HAS_JRTPLIB - if (m_rtpBufferAudioMono) { - m_rtpBufferAudioMono->deleteDestination(address, port); + if (m_rtpBufferAudio) { + m_rtpBufferAudio->deleteDestination(address, port); } - if (m_rtpBufferAudioStereo) { - m_rtpBufferAudioStereo->deleteDestination(address, port); - } -#endif } - -void AudioNetSink::setSampleRate(int sampleRate) +#else +void AudioNetSink::deleteDestination(const QString& address __attribute__((unused)), uint16_t port __attribute__((unused))) { -#ifdef HAS_JRTPLIB - if (m_rtpBufferAudioMono) { - m_rtpBufferAudioMono->setSampleRate(sampleRate); - } - if (m_rtpBufferAudioStereo) { - m_rtpBufferAudioStereo->setSampleRate(sampleRate); - } -#endif } +#endif void AudioNetSink::write(qint16 sample) { @@ -143,6 +131,7 @@ void AudioNetSink::write(qint16 sample) m_udpBufferAudioMono->write(sample); } else if (m_type == SinkRTP) { #ifdef HAS_JRTPLIB + m_rtpBufferAudio->write((uint8_t *) &sample); #endif } } @@ -157,6 +146,7 @@ void AudioNetSink::write(const AudioSample& sample) m_udpBufferAudioStereo->write(sample); } else if (m_type == SinkRTP) { #ifdef HAS_JRTPLIB + m_rtpBufferAudio->write((uint8_t *) &sample); #endif } } diff --git a/sdrbase/audio/audionetsink.h b/sdrbase/audio/audionetsink.h index 6381af744..d1b1218e0 100644 --- a/sdrbase/audio/audionetsink.h +++ b/sdrbase/audio/audionetsink.h @@ -23,7 +23,7 @@ #include "util/export.h" template class UDPSink; -template class RTPSink; +class RTPSink; class SDRANGEL_API AudioNetSink { public: @@ -39,7 +39,6 @@ public: void setDestination(const QString& address, uint16_t port); void addDestination(const QString& address, uint16_t port); void deleteDestination(const QString& address, uint16_t port); - void setSampleRate(int sampleRate); void write(qint16 sample); void write(const AudioSample& sample); @@ -52,8 +51,7 @@ protected: SinkType m_type; UDPSink *m_udpBufferAudioMono; UDPSink *m_udpBufferAudioStereo; - RTPSink *m_rtpBufferAudioMono; - RTPSink *m_rtpBufferAudioStereo; + RTPSink *m_rtpBufferAudio; }; diff --git a/sdrbase/util/rtpsink.cpp b/sdrbase/util/rtpsink.cpp new file mode 100644 index 000000000..605b39076 --- /dev/null +++ b/sdrbase/util/rtpsink.cpp @@ -0,0 +1,220 @@ +/////////////////////////////////////////////////////////////////////////////////// +// Copyright (C) 2018 F4EXB // +// written by Edouard Griffiths // +// // +// This program is free software; you can redistribute it and/or modify // +// it under the terms of the GNU General Public License as published by // +// the Free Software Foundation as version 3 of the License, or // +// // +// This program is distributed in the hope that it will be useful, // +// but WITHOUT ANY WARRANTY; without even the implied warranty of // +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // +// GNU General Public License V3 for more details. // +// // +// You should have received a copy of the GNU General Public License // +// along with this program. If not, see . // +/////////////////////////////////////////////////////////////////////////////////// + +#include "rtpsink.h" +#include "dsp/dsptypes.h" + +RTPSink::RTPSink(const QString& address, uint16_t port, PayloadType payloadType) : + m_sampleRate(48000), + m_sampleBytes(0), + m_packetSamples(0), + m_bufferSize(0), + m_sampleBufferIndex(0), + m_byteBuffer(0), + m_destport(port), + m_mutex(QMutex::Recursive) +{ + qDebug("RTPSink::RTPSink"); + m_destip = inet_addr(address.toStdString().c_str()); + m_destip = ntohl(m_destip); + + m_rtpSessionParams.SetOwnTimestampUnit(1.0 / (double) m_sampleRate); + int status = m_rtpSession.Create(m_rtpSessionParams); + + if (status < 0) { + qCritical("RTPSink::RTPSink: cannot create session: %s", jrtplib::RTPGetErrorString(status).c_str()); + } else { + qDebug("RTPSink::RTPSink: created session: %s", jrtplib::RTPGetErrorString(status).c_str()); + } + + setPayloadType(payloadType); +} + +RTPSink::~RTPSink() +{ + jrtplib::RTPTime delay = jrtplib::RTPTime(10.0); + m_rtpSession.BYEDestroy(delay, "Time's up", 9); + + if (m_byteBuffer) { + delete[] m_byteBuffer; + } +} + +void RTPSink::setPayloadType(PayloadType payloadType) +{ + QMutexLocker locker(&m_mutex); + + qDebug("RTPSink::setPayloadType: %d", payloadType); + + switch (payloadType) + { + case PayloadL16Stereo: + m_sampleRate = 48000; + m_sampleBytes = sizeof(AudioSample); + m_rtpSession.SetDefaultPayloadType(96); + break; + case PayloadL16Mono: + default: + m_sampleRate = 48000; + m_sampleBytes = sizeof(int16_t); + m_rtpSession.SetDefaultPayloadType(96); + break; + } + + m_packetSamples = m_sampleRate/50; // 20ms packet samples + m_bufferSize = m_packetSamples * m_sampleBytes; + + if (m_byteBuffer) { + delete[] m_byteBuffer; + } + + m_byteBuffer = new uint8_t[m_bufferSize]; + m_sampleBufferIndex = 0; + m_payloadType = payloadType; + + int status = m_rtpSession.SetTimestampUnit(1.0 / (double) m_sampleRate); + + if (status < 0) { + qCritical("RTPSink::setPayloadType: cannot set timestamp unit: %s", jrtplib::RTPGetErrorString(status).c_str()); + } else { + qDebug("RTPSink::setPayloadType: timestamp unit set to %f: %s", + 1.0 / (double) m_sampleRate, + jrtplib::RTPGetErrorString(status).c_str()); + } + + status = m_rtpSession.SetDefaultMark(false); + + if (status < 0) { + qCritical("RTPSink::setPayloadType: cannot set default mark: %s", jrtplib::RTPGetErrorString(status).c_str()); + } else { + qDebug("RTPSink::setPayloadType: set default mark to false: %s", jrtplib::RTPGetErrorString(status).c_str()); + } + + status = m_rtpSession.SetDefaultTimestampIncrement(m_packetSamples); + + if (status < 0) { + qCritical("RTPSink::setPayloadType: cannot set default timestamp increment: %s", jrtplib::RTPGetErrorString(status).c_str()); + } else { + qDebug("RTPSink::setPayloadType: set default timestamp increment to %d: %s", m_packetSamples, jrtplib::RTPGetErrorString(status).c_str()); + } + + status = m_rtpSession.SetMaximumPacketSize(m_bufferSize+40); + + if (status < 0) { + qCritical("RTPSink::setPayloadType: cannot set maximum packet size: %s", jrtplib::RTPGetErrorString(status).c_str()); + } else { + qDebug("RTPSink::setPayloadType: set maximum packet size to %d bytes: %s", m_bufferSize+40, jrtplib::RTPGetErrorString(status).c_str()); + } +} + +void RTPSink::setDestination(const QString& address, uint16_t port) +{ + m_rtpSession.ClearDestinations(); + m_rtpSession.DeleteDestination(jrtplib::RTPIPv4Address(m_destip, m_destport)); + m_destip = inet_addr(address.toStdString().c_str()); + m_destip = ntohl(m_destip); + m_destport = port; + + int status = m_rtpSession.AddDestination(jrtplib::RTPIPv4Address(m_destip, m_destport)); + + if (status < 0) { + qCritical("RTPSink::setDestination: cannot set destination address: %s", jrtplib::RTPGetErrorString(status).c_str()); + } +} + +void RTPSink::deleteDestination(const QString& address, uint16_t port) +{ + uint32_t destip = inet_addr(address.toStdString().c_str()); + destip = ntohl(m_destip); + + int status = m_rtpSession.DeleteDestination(jrtplib::RTPIPv4Address(destip, port)); + + if (status < 0) { + qCritical("RTPSink::deleteDestination: cannot delete destination address: %s", jrtplib::RTPGetErrorString(status).c_str()); + } +} + +void RTPSink::addDestination(const QString& address, uint16_t port) +{ + uint32_t destip = inet_addr(address.toStdString().c_str()); + destip = ntohl(m_destip); + + int status = m_rtpSession.AddDestination(jrtplib::RTPIPv4Address(destip, port)); + + if (status < 0) { + qCritical("RTPSink::addDestination: cannot add destination address: %s", jrtplib::RTPGetErrorString(status).c_str()); + } else { + qDebug("RTPSink::addDestination: destination address set to %s:%d: %s", + address.toStdString().c_str(), + port, + jrtplib::RTPGetErrorString(status).c_str()); + } +} + +void RTPSink::write(uint8_t *sampleByte) +{ + QMutexLocker locker(&m_mutex); + + if (m_sampleBufferIndex < m_packetSamples) + { + memcpy(&m_byteBuffer[m_sampleBufferIndex*m_sampleBytes], sampleByte, m_sampleBytes); + m_sampleBufferIndex++; + } + else + { + int status = m_rtpSession.SendPacket((const void *) m_byteBuffer, (std::size_t) m_bufferSize); + + if (status < 0) { + qCritical("RTPSink::write: cannot write packet: %s", jrtplib::RTPGetErrorString(status).c_str()); + } + + memcpy(&m_byteBuffer[0], sampleByte, m_sampleBytes); + m_sampleBufferIndex = 1; + } +} + +void RTPSink::write(uint8_t *samples, int nbSamples) +{ + int samplesIndex = 0; + QMutexLocker locker(&m_mutex); + + // fill remainder of buffer and send it + if (m_sampleBufferIndex + nbSamples > m_packetSamples) + { + memcpy(&m_byteBuffer[m_sampleBufferIndex*m_sampleBytes], + &samples[samplesIndex*m_sampleBytes], + (m_packetSamples - m_sampleBufferIndex)*m_sampleBytes); + + m_rtpSession.SendPacket((const void *) m_byteBuffer, (std::size_t) m_bufferSize); + nbSamples -= (m_packetSamples - m_sampleBufferIndex); + m_sampleBufferIndex = 0; + } + + // send complete packets + while (nbSamples > m_packetSamples) + { + m_rtpSession.SendPacket((const void *) &samples[samplesIndex*m_sampleBytes], (std::size_t) m_bufferSize); + samplesIndex += m_packetSamples; + nbSamples -= m_packetSamples; + } + + // copy remainder of input to buffer + memcpy(&m_byteBuffer[m_sampleBufferIndex*m_sampleBytes], + &samples[samplesIndex*m_sampleBytes], + nbSamples*m_sampleBytes); +} + diff --git a/sdrbase/util/rtpsink.h b/sdrbase/util/rtpsink.h index 648f1498f..5a000596e 100644 --- a/sdrbase/util/rtpsink.h +++ b/sdrbase/util/rtpsink.h @@ -30,112 +30,40 @@ #include "rtperrors.h" #include "rtplibraryversion.h" -template class RTPSink { public: - RTPSink(const QString& address, uint16_t port, int sampleRate) : - m_destport(port), - m_sampleBuffer(0), - m_sampleBufferIndex(0), - m_mutex(QMutex::Recursive) + typedef enum { - m_destip = inet_addr(address.toStdString().c_str()); - m_destip = ntohl(m_destip); - m_rtpSessionParams.SetOwnTimestampUnit(1.0/(double) sampleRate); - m_rtpTransmissionParams.SetPortbase(8092); // FIXME: sort this out + PayloadL16Mono, + PayloadL16Stereo, + } PayloadType; - int status = m_rtpSession.Create(m_rtpSessionParams, &m_rtpTransmissionParams); + RTPSink(const QString& address, uint16_t port, PayloadType payloadType = PayloadL16Mono); + ~RTPSink(); - if (status < 0) - { - qCritical("RTPSink::RTPSink: cannot create session: %s", jrtplib::RTPGetErrorString(status).c_str()); - return; - } + void setPayloadType(PayloadType payloadType); - status = m_rtpSession.AddDestination(jrtplib::RTPIPv4Address(m_destip, m_destport)); + void setDestination(const QString& address, uint16_t port); + void deleteDestination(const QString& address, uint16_t port); + void addDestination(const QString& address, uint16_t port); - if (status < 0) - { - qCritical("RTPSink::RTPSink: cannot set destination address: %s", jrtplib::RTPGetErrorString(status).c_str()); - return; - } - - m_sampleBuffer = new SampleType[sampleRate]; // store 1 second - } - - ~RTPSink() - { - jrtplib::RTPTime delay = jrtplib::RTPTime(10.0); - m_rtpSession.BYEDestroy(delay, "Time's up", 9); - - if (m_sampleBuffer) { delete[] m_sampleBuffer; } - } - - void setDestination(const QString& address, uint16_t port) - { - if (!m_sampleBuffer) { return; } - - m_rtpSession.ClearDestinations(); - m_rtpSession.DeleteDestination(jrtplib::RTPIPv4Address(m_destip, m_destport)); - m_destip = inet_addr(address.toStdString().c_str()); - m_destip = ntohl(m_destip); - m_destport = port; - - int status = m_rtpSession.AddDestination(jrtplib::RTPIPv4Address(m_destip, m_destport)); - - if (status < 0) { - qCritical("RTPSink::setDestination: cannot set destination address: %s", jrtplib::RTPGetErrorString(status).c_str()); - } - } - - void deleteDestination(const QString& address, uint16_t port) - { - uint32_t destip = inet_addr(address.toStdString().c_str()); - destip = ntohl(m_destip); - - int status = m_rtpSession.DeleteDestination(jrtplib::RTPIPv4Address(destip, port)); - - if (status < 0) { - qCritical("RTPSink::deleteDestination: cannot delete destination address: %s", jrtplib::RTPGetErrorString(status).c_str()); - } - } - - void addDestination(const QString& address, uint16_t port) - { - uint32_t destip = inet_addr(address.toStdString().c_str()); - destip = ntohl(m_destip); - - int status = m_rtpSession.AddDestination(jrtplib::RTPIPv4Address(destip, port)); - - if (status < 0) { - qCritical("RTPSink::addDestination: cannot add destination address: %s", jrtplib::RTPGetErrorString(status).c_str()); - } - } - - void setSampleRate(int sampleRate) - { - QMutexLocker locker(&m_mutex); - if (m_sampleBuffer) { delete[] m_sampleBuffer; } - m_sampleBuffer = new SampleType[sampleRate]; // store 1 second - - int status = m_rtpSession.SetTimestampUnit(1.0 / (double) sampleRate); - - if (status < 0) - { - qCritical("RTPSink::setSampleRate: cannot set timestamp unit: %s", jrtplib::RTPGetErrorString(status).c_str()); - return; - } - } + void write(uint8_t *sampleByte); + void write(uint8_t *sampleByte, int nbSamples); protected: + PayloadType m_payloadType; + int m_sampleRate; + int m_sampleBytes; + int m_packetSamples; + int m_bufferSize; + int m_sampleBufferIndex; + uint8_t *m_byteBuffer; uint32_t m_destip; uint16_t m_destport; jrtplib::RTPSession m_rtpSession; jrtplib::RTPSessionParams m_rtpSessionParams; jrtplib::RTPUDPv4TransmissionParams m_rtpTransmissionParams; - SampleType *m_sampleBuffer; - int m_sampleBufferIndex; QMutex m_mutex; };