diff --git a/plugins/channelrx/demodnfm/nfmdemod.cpp b/plugins/channelrx/demodnfm/nfmdemod.cpp index f3fb247a4..7509cabdf 100644 --- a/plugins/channelrx/demodnfm/nfmdemod.cpp +++ b/plugins/channelrx/demodnfm/nfmdemod.cpp @@ -77,7 +77,7 @@ NFMDemod::NFMDemod(DeviceSourceAPI *devieAPI) : m_afSquelch.setCoefficients(24, 600, 48000.0, 200, 0); // 0.5ms test period, 300ms average span, 48kS/s SR, 100ms attack, no decay DSPEngine::instance()->addAudioSink(&m_audioFifo); - m_audioNetSink = new AudioNetSink(this); + m_audioNetSink = new AudioNetSink(0); // parent thread allocated dynamically m_audioNetSink->setDestination(m_settings.m_udpAddress, m_settings.m_udpPort); m_channelizer = new DownChannelizer(this); @@ -374,6 +374,14 @@ bool NFMDemod::handleMessage(const Message& cmd) return true; } + else if (BasebandSampleSink::MsgThreadedSink::match(cmd)) + { + BasebandSampleSink::MsgThreadedSink& cfg = (BasebandSampleSink::MsgThreadedSink&) cmd; + const QThread *thread = cfg.getThread(); + qDebug("NFMDemod::handleMessage: BasebandSampleSink::MsgThreadedSink: %p", thread); + m_audioNetSink->moveToThread(const_cast(thread)); // use the thread for udp sinks + return true; + } else if (DSPSignalNotification::match(cmd)) { return true; diff --git a/qrtplib/rtpudptransmitter.cpp b/qrtplib/rtpudptransmitter.cpp index e7650afc6..f4fef5a76 100644 --- a/qrtplib/rtpudptransmitter.cpp +++ b/qrtplib/rtpudptransmitter.cpp @@ -49,7 +49,6 @@ RTPUDPTransmitter::RTPUDPTransmitter() : m_rtcpsock = 0; m_rtpsock = 0; m_waitingfordata = false; - m_closesocketswhendone = false; m_rtcpPort = 0; m_rtpPort = 0; m_receivemode = RTPTransmitter::AcceptAll; @@ -123,35 +122,26 @@ int RTPUDPTransmitter::Create(std::size_t maximumpacketsize, const RTPTransmissi } } - if (params->GetUseExistingSockets(&m_rtpsock, &m_rtcpsock)) + m_rtpsock = new QUdpSocket(); + + // If we're multiplexing, we're just going to set the RTCP socket to equal the RTP socket + if (params->GetRTCPMultiplexing()) { - m_closesocketswhendone = false; + m_rtcpsock = m_rtpsock; + m_rtcpPort = m_rtpPort; + } else { + m_rtcpsock = new QUdpSocket(); } - else + + // set socket buffer sizes + + size = params->GetRTPReceiveBufferSize(); + m_rtpsock->setReadBufferSize(size); + + if (m_rtpsock != m_rtcpsock) { - m_rtpsock = new QUdpSocket(); - - // If we're multiplexing, we're just going to set the RTCP socket to equal the RTP socket - if (params->GetRTCPMultiplexing()) - { - m_rtcpsock = m_rtpsock; - m_rtcpPort = m_rtpPort; - } else { - m_rtcpsock = new QUdpSocket(); - } - - m_closesocketswhendone = true; - - // set socket buffer sizes - - size = params->GetRTPReceiveBufferSize(); - m_rtpsock->setReadBufferSize(size); - - if (m_rtpsock != m_rtcpsock) - { - size = params->GetRTCPReceiveBufferSize(); - m_rtcpsock->setReadBufferSize(size); - } + size = params->GetRTCPReceiveBufferSize(); + m_rtcpsock->setReadBufferSize(size); } m_maxpacksize = maximumpacketsize; @@ -195,15 +185,12 @@ void RTPUDPTransmitter::Destroy() return; } - if (m_closesocketswhendone) - { - if (m_rtpsock != m_rtcpsock) { - delete m_rtcpsock; - } - - delete m_rtpsock; + if (m_rtpsock != m_rtcpsock) { + delete m_rtcpsock; } + delete m_rtpsock; + m_created = false; } diff --git a/qrtplib/rtpudptransmitter.h b/qrtplib/rtpudptransmitter.h index 48fbbf8b3..7b1719d61 100644 --- a/qrtplib/rtpudptransmitter.h +++ b/qrtplib/rtpudptransmitter.h @@ -142,16 +142,6 @@ public: m_forcedrtcpport = rtcpport; } - /** Use sockets that have already been created, no checks on port numbers - * will be done, and no buffer sizes will be set; you'll need to close - * the sockets yourself when done, it will **not** be done automatically. */ - void SetUseExistingSockets(QUdpSocket *rtpsocket, QUdpSocket *rtcpsocket) - { - m_rtpsock = rtpsocket; - m_rtcpsock = rtcpsocket; - m_useexistingsockets = true; - } - /** Returns the RTP socket's send buffer size. */ int GetRTPSendBufferSize() const { @@ -194,19 +184,6 @@ public: return m_forcedrtcpport; } - /** Returns true and fills in sockets if existing sockets were set - * using RTPUDPv4TransmissionParams::SetUseExistingSockets. */ - bool GetUseExistingSockets(QUdpSocket **rtpsocket, QUdpSocket **rtcpsocket) const - { - if (!m_useexistingsockets) { - return false; - } - - *rtpsocket = m_rtpsock; - *rtcpsocket = m_rtcpsock; - return true; - } - private: QHostAddress m_bindAddress; QNetworkInterface m_mcastInterface; @@ -218,7 +195,6 @@ private: uint16_t m_forcedrtcpport; QUdpSocket *m_rtpsock, *m_rtcpsock; - bool m_useexistingsockets; }; inline RTPUDPTransmissionParams::RTPUDPTransmissionParams() : @@ -232,7 +208,6 @@ inline RTPUDPTransmissionParams::RTPUDPTransmissionParams() : m_rtcpmux = false; m_allowoddportbase = false; m_forcedrtcpport = 0; - m_useexistingsockets = false; m_rtpsock = 0; m_rtcpsock = 0; } @@ -360,8 +335,6 @@ private: QQueue m_rawPacketQueue; QMutex m_rawPacketQueueLock; - bool m_closesocketswhendone; - bool ShouldAcceptData(const RTPAddress& address); private slots: diff --git a/sdrbase/CMakeLists.txt b/sdrbase/CMakeLists.txt index 185fc1f0e..ff2597f76 100644 --- a/sdrbase/CMakeLists.txt +++ b/sdrbase/CMakeLists.txt @@ -65,6 +65,7 @@ set(sdrbase_SOURCES util/message.cpp util/messagequeue.cpp util/prettyprint.cpp + util/rtpsink.cpp util/syncmessenger.cpp util/samplesourceserializer.cpp util/simpleserializer.cpp @@ -173,6 +174,7 @@ set(sdrbase_HEADERS util/messagequeue.h util/movingaverage.h util/prettyprint.h + util/rtpsink.h util/syncmessenger.h util/samplesourceserializer.h util/simpleserializer.h @@ -215,19 +217,6 @@ else(FFTW3F_FOUND) add_definitions(-DUSE_KISSFFT) endif(FFTW3F_FOUND) -if (JRTPLIB_FOUND) - set(sdrbase_HEADERS - ${sdrbase_HEADERS} - util/rtpsink.h - ) - set(sdrbase_SOURCES - ${sdrbase_SOURCES} - util/rtpsink.cpp - ) - add_definitions(-DHAS_JRTPLIB) - include_directories(${JRTPLIB_INCLUDE_DIR}) -endif(JRTPLIB_FOUND) - if (LIBSERIALDV_FOUND) set(sdrbase_SOURCES ${sdrbase_SOURCES} @@ -270,12 +259,14 @@ include_directories( ${CMAKE_CURRENT_BINARY_DIR} . ${CMAKE_SOURCE_DIR}/httpserver + ${CMAKE_SOURCE_DIR}/qrtplib ${CMAKE_SOURCE_DIR}/swagger/sdrangel/code/qt5/client ) target_link_libraries(sdrbase ${QT_LIBRARIES} httpserver + qrtplib swagger ) @@ -283,10 +274,6 @@ if(FFTW3F_FOUND) target_link_libraries(sdrbase ${FFTW3F_LIBRARIES}) endif(FFTW3F_FOUND) -if (JRTPLIB_FOUND) - target_link_libraries(sdrbase ${JRTPLIB_LIBRARIES}) -endif(JRTPLIB_FOUND) - if(LIBSERIALDV_FOUND) target_link_libraries(sdrbase ${LIBSERIALDV_LIBRARY}) endif(LIBSERIALDV_FOUND) diff --git a/sdrbase/audio/audionetsink.cpp b/sdrbase/audio/audionetsink.cpp index 50e4a2437..6444410e7 100644 --- a/sdrbase/audio/audionetsink.cpp +++ b/sdrbase/audio/audionetsink.cpp @@ -17,9 +17,7 @@ #include "audionetsink.h" #include "util/udpsink.h" -#ifdef HAS_JRTPLIB #include "util/rtpsink.h" -#endif const int AudioNetSink::m_udpBlockSize = 512; @@ -35,9 +33,7 @@ AudioNetSink::AudioNetSink(QObject *parent, bool stereo) : m_udpBufferAudioMono = new UDPSink(parent, m_udpBlockSize); } -#ifdef HAS_JRTPLIB m_rtpBufferAudio = new RTPSink("127.0.0.1", 9999, stereo ? RTPSink::PayloadL16Stereo : RTPSink::PayloadL16Mono); -#endif } AudioNetSink::~AudioNetSink() @@ -49,20 +45,14 @@ AudioNetSink::~AudioNetSink() if (m_udpBufferAudioStereo) { delete m_udpBufferAudioStereo; } -#ifdef HAS_JRTPLIB if (m_rtpBufferAudio) { delete m_rtpBufferAudio; } -#endif } bool AudioNetSink::isRTPCapable() const { -#ifdef HAS_JRTPLIB return m_rtpBufferAudio->isValid(); -#else - return false; -#endif } bool AudioNetSink::selectType(SinkType type) @@ -74,13 +64,8 @@ bool AudioNetSink::selectType(SinkType type) } else if (type == SinkRTP) { -#ifdef HAS_JRTPLIB m_type = SinkRTP; return true; -#else - m_type = SinkUDP; - return false; -#endif } else { @@ -96,39 +81,24 @@ void AudioNetSink::setDestination(const QString& address, uint16_t port) if (m_udpBufferAudioStereo) { m_udpBufferAudioStereo->setDestination(address, port); } - -#ifdef HAS_JRTPLIB if (m_rtpBufferAudio) { m_rtpBufferAudio->setDestination(address, port); } -#endif } -#ifdef HAS_JRTPLIB void AudioNetSink::addDestination(const QString& address, uint16_t port) { if (m_rtpBufferAudio) { m_rtpBufferAudio->addDestination(address, port); } } -#else -void AudioNetSink::addDestination(const QString& address __attribute__((unused)), uint16_t port __attribute__((unused))) -{ -} -#endif -#ifdef HAS_JRTPLIB void AudioNetSink::deleteDestination(const QString& address, uint16_t port) { if (m_rtpBufferAudio) { m_rtpBufferAudio->deleteDestination(address, port); } } -#else -void AudioNetSink::deleteDestination(const QString& address __attribute__((unused)), uint16_t port __attribute__((unused))) -{ -} -#endif void AudioNetSink::write(qint16 sample) { @@ -139,9 +109,7 @@ void AudioNetSink::write(qint16 sample) if (m_type == SinkUDP) { m_udpBufferAudioMono->write(sample); } else if (m_type == SinkRTP) { -#ifdef HAS_JRTPLIB m_rtpBufferAudio->write((uint8_t *) &sample); -#endif } } @@ -154,10 +122,18 @@ void AudioNetSink::write(const AudioSample& sample) if (m_type == SinkUDP) { m_udpBufferAudioStereo->write(sample); } else if (m_type == SinkRTP) { -#ifdef HAS_JRTPLIB m_rtpBufferAudio->write((uint8_t *) &sample); -#endif } } +void AudioNetSink::moveToThread(QThread *thread) +{ + if (m_udpBufferAudioMono) { + m_udpBufferAudioMono->moveToThread(thread); + } + + if (m_udpBufferAudioStereo) { + m_udpBufferAudioMono->moveToThread(thread); + } +} diff --git a/sdrbase/audio/audionetsink.h b/sdrbase/audio/audionetsink.h index 750aaf404..a1781de8c 100644 --- a/sdrbase/audio/audionetsink.h +++ b/sdrbase/audio/audionetsink.h @@ -24,6 +24,7 @@ template class UDPSink; class RTPSink; +class QThread; class SDRBASE_API AudioNetSink { public: @@ -46,6 +47,8 @@ public: bool isRTPCapable() const; bool selectType(SinkType type); + void moveToThread(QThread *thread); + static const int m_udpBlockSize; protected: diff --git a/sdrbase/dsp/basebandsamplesink.cpp b/sdrbase/dsp/basebandsamplesink.cpp index 7887f768d..81c9f53a9 100644 --- a/sdrbase/dsp/basebandsamplesink.cpp +++ b/sdrbase/dsp/basebandsamplesink.cpp @@ -1,5 +1,6 @@ -#include -#include "util/message.h" +#include "basebandsamplesink.h" + +MESSAGE_CLASS_DEFINITION(BasebandSampleSink::MsgThreadedSink, Message) BasebandSampleSink::BasebandSampleSink() : m_guiMessageQueue(0) diff --git a/sdrbase/dsp/basebandsamplesink.h b/sdrbase/dsp/basebandsamplesink.h index 462d0ec04..f70e93230 100644 --- a/sdrbase/dsp/basebandsamplesink.h +++ b/sdrbase/dsp/basebandsamplesink.h @@ -22,12 +22,34 @@ #include "dsp/dsptypes.h" #include "util/export.h" #include "util/messagequeue.h" +#include "util/message.h" class Message; class SDRBASE_API BasebandSampleSink : public QObject { Q_OBJECT public: + /** Used to notify on which thread the sample sink is now running (with ThreadedSampleSink) */ + class MsgThreadedSink : public Message { + MESSAGE_CLASS_DECLARATION + + public: + const QThread* getThread() const { return m_thread; } + + static MsgThreadedSink* create(const QThread* thread) + { + return new MsgThreadedSink(thread); + } + + private: + const QThread *m_thread; + + MsgThreadedSink(const QThread *thread) : + Message(), + m_thread(thread) + { } + }; + BasebandSampleSink(); virtual ~BasebandSampleSink(); diff --git a/sdrbase/dsp/downchannelizer.cpp b/sdrbase/dsp/downchannelizer.cpp index cfbec834d..3b2772db0 100644 --- a/sdrbase/dsp/downchannelizer.cpp +++ b/sdrbase/dsp/downchannelizer.cpp @@ -144,6 +144,10 @@ bool DownChannelizer::handleMessage(const Message& cmd) return true; } + else if (BasebandSampleSink::MsgThreadedSink::match(cmd)) + { + return m_sampleSink->handleMessage(cmd); // this message is passed to the demod + } else { return false; diff --git a/sdrbase/dsp/threadedbasebandsamplesink.cpp b/sdrbase/dsp/threadedbasebandsamplesink.cpp index 751e2ce19..0ab77303a 100644 --- a/sdrbase/dsp/threadedbasebandsamplesink.cpp +++ b/sdrbase/dsp/threadedbasebandsamplesink.cpp @@ -76,6 +76,10 @@ ThreadedBasebandSampleSink::ThreadedBasebandSampleSink(BasebandSampleSink* sampl //moveToThread(m_thread); // FIXME: Fixed? the intermediate FIFO should be handled within the sink. Define a new type of sink that is compatible with threading m_basebandSampleSink->moveToThread(m_thread); m_threadedBasebandSampleSinkFifo->moveToThread(m_thread); + BasebandSampleSink::MsgThreadedSink *msg = BasebandSampleSink::MsgThreadedSink::create(m_thread); // inform of the new thread + if (!m_basebandSampleSink->handleMessage(*msg)) { + delete msg; + } //m_sampleFifo.moveToThread(m_thread); //connect(&m_sampleFifo, SIGNAL(dataReady()), this, SLOT(handleData())); //m_sampleFifo.setSize(262144); diff --git a/sdrbase/dsp/threadedbasebandsamplesink.h b/sdrbase/dsp/threadedbasebandsamplesink.h index 9c9a9fe1a..727d03903 100644 --- a/sdrbase/dsp/threadedbasebandsamplesink.h +++ b/sdrbase/dsp/threadedbasebandsamplesink.h @@ -66,6 +66,7 @@ public: void feed(SampleVector::const_iterator begin, SampleVector::const_iterator end, bool positiveOnly); //!< Feed sink with samples QString getSampleSinkObjectName() const; + const QThread *getThread() const { return m_thread; } protected: diff --git a/sdrbase/util/rtpsink.cpp b/sdrbase/util/rtpsink.cpp index e794117c5..76cba88d8 100644 --- a/sdrbase/util/rtpsink.cpp +++ b/sdrbase/util/rtpsink.cpp @@ -28,35 +28,28 @@ RTPSink::RTPSink(const QString& address, uint16_t port, PayloadType payloadType) m_sampleBufferIndex(0), m_byteBuffer(0), m_destport(port), - m_rtpTransmitter(&m_rtpMemoryManager), m_mutex(QMutex::Recursive) { - // Here we use JRTPLIB in a bit funny way since we do not want the socket to bind because we are only sending - // data to a remote party and we don't want to waste a port on the local machine for each possible connection that may not be used. - // Therefore we create a socket and assign it through the SetUseExistingSockets method of the RTPUDPv4TransmissionParams object - // By doing this the socket is left unbound but sending RTP packets with the library is still possible. Other functions may - // not work but we don't care - m_rtpSessionParams.SetOwnTimestampUnit(1.0 / (double) m_sampleRate); m_rtpTransmissionParams.SetRTCPMultiplexing(true); // do not allocate another socket for RTCP - int status = m_rtpTransmitter.Init(false); + int status = m_rtpTransmitter.Init(); if (status < 0) { - qCritical("RTPSink::RTPSink: cannot initialize transmitter: %s", jrtplib::RTPGetErrorString(status).c_str()); + qCritical("RTPSink::RTPSink: cannot initialize transmitter: %s", qrtplib::RTPGetErrorString(status).c_str()); m_valid = false; } else { - qDebug("RTPSink::RTPSink: initialized transmitter: %s", jrtplib::RTPGetErrorString(status).c_str()); + qDebug("RTPSink::RTPSink: initialized transmitter: %s", qrtplib::RTPGetErrorString(status).c_str()); } m_rtpTransmitter.Create(m_rtpSessionParams.GetMaximumPacketSize(), &m_rtpTransmissionParams); - qDebug("RTPSink::RTPSink: created transmitter: %s", jrtplib::RTPGetErrorString(status).c_str()); + qDebug("RTPSink::RTPSink: created transmitter: %s", qrtplib::RTPGetErrorString(status).c_str()); status = m_rtpSession.Create(m_rtpSessionParams, &m_rtpTransmitter); if (status < 0) { - qCritical("RTPSink::RTPSink: cannot create session: %s", jrtplib::RTPGetErrorString(status).c_str()); + qCritical("RTPSink::RTPSink: cannot create session: %s", qrtplib::RTPGetErrorString(status).c_str()); m_valid = false; } else { - qDebug("RTPSink::RTPSink: created session: %s", jrtplib::RTPGetErrorString(status).c_str()); + qDebug("RTPSink::RTPSink: created session: %s", qrtplib::RTPGetErrorString(status).c_str()); } setPayloadType(payloadType); @@ -66,14 +59,12 @@ RTPSink::RTPSink(const QString& address, uint16_t port, PayloadType payloadType) uint8_t *ptr = (uint8_t*) &endianTest32; m_endianReverse = (*ptr == 1); - m_destip = inet_addr(address.toStdString().c_str()); - m_destip = ntohl(m_destip); - + m_destip.setAddress(address); } RTPSink::~RTPSink() { - jrtplib::RTPTime delay = jrtplib::RTPTime(10.0); + qrtplib::RTPTime delay = qrtplib::RTPTime(10.0); m_rtpSession.BYEDestroy(delay, "Time's up", 9); if (m_byteBuffer) { @@ -116,79 +107,76 @@ void RTPSink::setPayloadType(PayloadType payloadType) int status = m_rtpSession.SetTimestampUnit(1.0 / (double) m_sampleRate); if (status < 0) { - qCritical("RTPSink::setPayloadType: cannot set timestamp unit: %s", jrtplib::RTPGetErrorString(status).c_str()); + qCritical("RTPSink::setPayloadType: cannot set timestamp unit: %s", qrtplib::RTPGetErrorString(status).c_str()); } else { qDebug("RTPSink::setPayloadType: timestamp unit set to %f: %s", 1.0 / (double) m_sampleRate, - jrtplib::RTPGetErrorString(status).c_str()); + qrtplib::RTPGetErrorString(status).c_str()); } status = m_rtpSession.SetDefaultMark(false); if (status < 0) { - qCritical("RTPSink::setPayloadType: cannot set default mark: %s", jrtplib::RTPGetErrorString(status).c_str()); + qCritical("RTPSink::setPayloadType: cannot set default mark: %s", qrtplib::RTPGetErrorString(status).c_str()); } else { - qDebug("RTPSink::setPayloadType: set default mark to false: %s", jrtplib::RTPGetErrorString(status).c_str()); + qDebug("RTPSink::setPayloadType: set default mark to false: %s", qrtplib::RTPGetErrorString(status).c_str()); } status = m_rtpSession.SetDefaultTimestampIncrement(m_packetSamples); if (status < 0) { - qCritical("RTPSink::setPayloadType: cannot set default timestamp increment: %s", jrtplib::RTPGetErrorString(status).c_str()); + qCritical("RTPSink::setPayloadType: cannot set default timestamp increment: %s", qrtplib::RTPGetErrorString(status).c_str()); } else { - qDebug("RTPSink::setPayloadType: set default timestamp increment to %d: %s", m_packetSamples, jrtplib::RTPGetErrorString(status).c_str()); + qDebug("RTPSink::setPayloadType: set default timestamp increment to %d: %s", m_packetSamples, qrtplib::RTPGetErrorString(status).c_str()); } status = m_rtpSession.SetMaximumPacketSize(m_bufferSize+40); if (status < 0) { - qCritical("RTPSink::setPayloadType: cannot set maximum packet size: %s", jrtplib::RTPGetErrorString(status).c_str()); + qCritical("RTPSink::setPayloadType: cannot set maximum packet size: %s", qrtplib::RTPGetErrorString(status).c_str()); } else { - qDebug("RTPSink::setPayloadType: set maximum packet size to %d bytes: %s", m_bufferSize+40, jrtplib::RTPGetErrorString(status).c_str()); + qDebug("RTPSink::setPayloadType: set maximum packet size to %d bytes: %s", m_bufferSize+40, qrtplib::RTPGetErrorString(status).c_str()); } } void RTPSink::setDestination(const QString& address, uint16_t port) { m_rtpSession.ClearDestinations(); - m_rtpSession.DeleteDestination(jrtplib::RTPIPv4Address(m_destip, m_destport)); - m_destip = inet_addr(address.toStdString().c_str()); - m_destip = ntohl(m_destip); + m_rtpSession.DeleteDestination(qrtplib::RTPAddress(m_destip, m_destport)); + m_destip.setAddress(address); m_destport = port; - int status = m_rtpSession.AddDestination(jrtplib::RTPIPv4Address(m_destip, m_destport)); + int status = m_rtpSession.AddDestination(qrtplib::RTPAddress(m_destip, m_destport)); if (status < 0) { - qCritical("RTPSink::setDestination: cannot set destination address: %s", jrtplib::RTPGetErrorString(status).c_str()); + qCritical("RTPSink::setDestination: cannot set destination address: %s", qrtplib::RTPGetErrorString(status).c_str()); } } void RTPSink::deleteDestination(const QString& address, uint16_t port) { - uint32_t destip = inet_addr(address.toStdString().c_str()); - destip = ntohl(m_destip); + QHostAddress destip(address); - int status = m_rtpSession.DeleteDestination(jrtplib::RTPIPv4Address(destip, port)); + int status = m_rtpSession.DeleteDestination(qrtplib::RTPAddress(destip, port)); if (status < 0) { - qCritical("RTPSink::deleteDestination: cannot delete destination address: %s", jrtplib::RTPGetErrorString(status).c_str()); + qCritical("RTPSink::deleteDestination: cannot delete destination address: %s", qrtplib::RTPGetErrorString(status).c_str()); } } void RTPSink::addDestination(const QString& address, uint16_t port) { - uint32_t destip = inet_addr(address.toStdString().c_str()); - destip = ntohl(m_destip); + QHostAddress destip(address); - int status = m_rtpSession.AddDestination(jrtplib::RTPIPv4Address(destip, port)); + int status = m_rtpSession.AddDestination(qrtplib::RTPAddress(destip, port)); if (status < 0) { - qCritical("RTPSink::addDestination: cannot add destination address: %s", jrtplib::RTPGetErrorString(status).c_str()); + qCritical("RTPSink::addDestination: cannot add destination address: %s", qrtplib::RTPGetErrorString(status).c_str()); } else { qDebug("RTPSink::addDestination: destination address set to %s:%d: %s", address.toStdString().c_str(), port, - jrtplib::RTPGetErrorString(status).c_str()); + qrtplib::RTPGetErrorString(status).c_str()); } } @@ -210,7 +198,7 @@ void RTPSink::write(const uint8_t *sampleByte) int status = m_rtpSession.SendPacket((const void *) m_byteBuffer, (std::size_t) m_bufferSize); if (status < 0) { - qCritical("RTPSink::write: cannot write packet: %s", jrtplib::RTPGetErrorString(status).c_str()); + qCritical("RTPSink::write: cannot write packet: %s", qrtplib::RTPGetErrorString(status).c_str()); } writeNetBuf(&m_byteBuffer[0], sampleByte, elemLength(m_payloadType), m_sampleBytes, m_endianReverse); diff --git a/sdrbase/util/rtpsink.h b/sdrbase/util/rtpsink.h index 28753f3c4..9b56c19be 100644 --- a/sdrbase/util/rtpsink.h +++ b/sdrbase/util/rtpsink.h @@ -21,48 +21,18 @@ #include #include #include +#include #include -// jrtplib includes +// qrtplib includes #include "rtpsession.h" -#include "rtpudpv4transmitternobind.h" -#include "rtpipv4address.h" +#include "rtpudptransmitter.h" +#include "rtpaddress.h" #include "rtpsessionparams.h" #include "rtperrors.h" -#include "rtplibraryversion.h" #include "util/export.h" -class SDRBASE_API RTPSinkMemoryManager : public jrtplib::RTPMemoryManager -{ -public: - RTPSinkMemoryManager() - { - alloccount = 0; - freecount = 0; - } - ~RTPSinkMemoryManager() - { - qDebug() << "RTPSinkMemoryManager::~RTPSinkMemoryManager: alloc: " << alloccount << " free: " << freecount; - } - void *AllocateBuffer(size_t numbytes, int memtype) - { - void *buf = malloc(numbytes); - qDebug() << "RTPSinkMemoryManager::AllocateBuffer: Allocated " << numbytes << " bytes at location " << buf << " (memtype = " << memtype << ")"; - alloccount++; - return buf; - } - - void FreeBuffer(void *p) - { - qDebug() << "RTPSinkMemoryManager::FreeBuffer: Freeing block " << p; - freecount++; - free(p); - } -private: - int alloccount,freecount; -}; - class RTPSink { public: @@ -98,13 +68,12 @@ protected: int m_bufferSize; int m_sampleBufferIndex; uint8_t *m_byteBuffer; - uint32_t m_destip; + QHostAddress m_destip; uint16_t m_destport; - jrtplib::RTPSession m_rtpSession; - jrtplib::RTPSessionParams m_rtpSessionParams; - jrtplib::RTPUDPv4TransmissionNoBindParams m_rtpTransmissionParams; - jrtplib::RTPUDPv4TransmitterNoBind m_rtpTransmitter; - RTPSinkMemoryManager m_rtpMemoryManager; + qrtplib::RTPSession m_rtpSession; + qrtplib::RTPSessionParams m_rtpSessionParams; + qrtplib::RTPUDPTransmissionParams m_rtpTransmissionParams; + qrtplib::RTPUDPTransmitter m_rtpTransmitter; bool m_endianReverse; QMutex m_mutex; }; diff --git a/sdrbase/util/udpsink.h b/sdrbase/util/udpsink.h index c271b172e..17ed66a2b 100644 --- a/sdrbase/util/udpsink.h +++ b/sdrbase/util/udpsink.h @@ -71,6 +71,11 @@ public: delete m_socket; } + void moveToThread(QThread *thread) + { + m_socket->moveToThread(thread); + } + void setAddress(QString& address) { m_address.setAddress(address); } void setPort(unsigned int port) { m_port = port; }