mirror of https://github.com/f4exb/sdrangel.git
qrtplib: NFM demod implementation. Also solve issue Cannot create children for a parent that is in a different thread on UDP sink used for copy audio to UDP
This commit is contained in:
parent
9f9eaa7a88
commit
9dacbb6d83
|
@ -77,7 +77,7 @@ NFMDemod::NFMDemod(DeviceSourceAPI *devieAPI) :
|
||||||
m_afSquelch.setCoefficients(24, 600, 48000.0, 200, 0); // 0.5ms test period, 300ms average span, 48kS/s SR, 100ms attack, no decay
|
m_afSquelch.setCoefficients(24, 600, 48000.0, 200, 0); // 0.5ms test period, 300ms average span, 48kS/s SR, 100ms attack, no decay
|
||||||
|
|
||||||
DSPEngine::instance()->addAudioSink(&m_audioFifo);
|
DSPEngine::instance()->addAudioSink(&m_audioFifo);
|
||||||
m_audioNetSink = new AudioNetSink(this);
|
m_audioNetSink = new AudioNetSink(0); // parent thread allocated dynamically
|
||||||
m_audioNetSink->setDestination(m_settings.m_udpAddress, m_settings.m_udpPort);
|
m_audioNetSink->setDestination(m_settings.m_udpAddress, m_settings.m_udpPort);
|
||||||
|
|
||||||
m_channelizer = new DownChannelizer(this);
|
m_channelizer = new DownChannelizer(this);
|
||||||
|
@ -374,6 +374,14 @@ bool NFMDemod::handleMessage(const Message& cmd)
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
else if (BasebandSampleSink::MsgThreadedSink::match(cmd))
|
||||||
|
{
|
||||||
|
BasebandSampleSink::MsgThreadedSink& cfg = (BasebandSampleSink::MsgThreadedSink&) cmd;
|
||||||
|
const QThread *thread = cfg.getThread();
|
||||||
|
qDebug("NFMDemod::handleMessage: BasebandSampleSink::MsgThreadedSink: %p", thread);
|
||||||
|
m_audioNetSink->moveToThread(const_cast<QThread*>(thread)); // use the thread for udp sinks
|
||||||
|
return true;
|
||||||
|
}
|
||||||
else if (DSPSignalNotification::match(cmd))
|
else if (DSPSignalNotification::match(cmd))
|
||||||
{
|
{
|
||||||
return true;
|
return true;
|
||||||
|
|
|
@ -49,7 +49,6 @@ RTPUDPTransmitter::RTPUDPTransmitter() :
|
||||||
m_rtcpsock = 0;
|
m_rtcpsock = 0;
|
||||||
m_rtpsock = 0;
|
m_rtpsock = 0;
|
||||||
m_waitingfordata = false;
|
m_waitingfordata = false;
|
||||||
m_closesocketswhendone = false;
|
|
||||||
m_rtcpPort = 0;
|
m_rtcpPort = 0;
|
||||||
m_rtpPort = 0;
|
m_rtpPort = 0;
|
||||||
m_receivemode = RTPTransmitter::AcceptAll;
|
m_receivemode = RTPTransmitter::AcceptAll;
|
||||||
|
@ -123,35 +122,26 @@ int RTPUDPTransmitter::Create(std::size_t maximumpacketsize, const RTPTransmissi
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (params->GetUseExistingSockets(&m_rtpsock, &m_rtcpsock))
|
m_rtpsock = new QUdpSocket();
|
||||||
|
|
||||||
|
// If we're multiplexing, we're just going to set the RTCP socket to equal the RTP socket
|
||||||
|
if (params->GetRTCPMultiplexing())
|
||||||
{
|
{
|
||||||
m_closesocketswhendone = false;
|
m_rtcpsock = m_rtpsock;
|
||||||
|
m_rtcpPort = m_rtpPort;
|
||||||
|
} else {
|
||||||
|
m_rtcpsock = new QUdpSocket();
|
||||||
}
|
}
|
||||||
else
|
|
||||||
|
// set socket buffer sizes
|
||||||
|
|
||||||
|
size = params->GetRTPReceiveBufferSize();
|
||||||
|
m_rtpsock->setReadBufferSize(size);
|
||||||
|
|
||||||
|
if (m_rtpsock != m_rtcpsock)
|
||||||
{
|
{
|
||||||
m_rtpsock = new QUdpSocket();
|
size = params->GetRTCPReceiveBufferSize();
|
||||||
|
m_rtcpsock->setReadBufferSize(size);
|
||||||
// If we're multiplexing, we're just going to set the RTCP socket to equal the RTP socket
|
|
||||||
if (params->GetRTCPMultiplexing())
|
|
||||||
{
|
|
||||||
m_rtcpsock = m_rtpsock;
|
|
||||||
m_rtcpPort = m_rtpPort;
|
|
||||||
} else {
|
|
||||||
m_rtcpsock = new QUdpSocket();
|
|
||||||
}
|
|
||||||
|
|
||||||
m_closesocketswhendone = true;
|
|
||||||
|
|
||||||
// set socket buffer sizes
|
|
||||||
|
|
||||||
size = params->GetRTPReceiveBufferSize();
|
|
||||||
m_rtpsock->setReadBufferSize(size);
|
|
||||||
|
|
||||||
if (m_rtpsock != m_rtcpsock)
|
|
||||||
{
|
|
||||||
size = params->GetRTCPReceiveBufferSize();
|
|
||||||
m_rtcpsock->setReadBufferSize(size);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
m_maxpacksize = maximumpacketsize;
|
m_maxpacksize = maximumpacketsize;
|
||||||
|
@ -195,15 +185,12 @@ void RTPUDPTransmitter::Destroy()
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (m_closesocketswhendone)
|
if (m_rtpsock != m_rtcpsock) {
|
||||||
{
|
delete m_rtcpsock;
|
||||||
if (m_rtpsock != m_rtcpsock) {
|
|
||||||
delete m_rtcpsock;
|
|
||||||
}
|
|
||||||
|
|
||||||
delete m_rtpsock;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
delete m_rtpsock;
|
||||||
|
|
||||||
m_created = false;
|
m_created = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -142,16 +142,6 @@ public:
|
||||||
m_forcedrtcpport = rtcpport;
|
m_forcedrtcpport = rtcpport;
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Use sockets that have already been created, no checks on port numbers
|
|
||||||
* will be done, and no buffer sizes will be set; you'll need to close
|
|
||||||
* the sockets yourself when done, it will **not** be done automatically. */
|
|
||||||
void SetUseExistingSockets(QUdpSocket *rtpsocket, QUdpSocket *rtcpsocket)
|
|
||||||
{
|
|
||||||
m_rtpsock = rtpsocket;
|
|
||||||
m_rtcpsock = rtcpsocket;
|
|
||||||
m_useexistingsockets = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Returns the RTP socket's send buffer size. */
|
/** Returns the RTP socket's send buffer size. */
|
||||||
int GetRTPSendBufferSize() const
|
int GetRTPSendBufferSize() const
|
||||||
{
|
{
|
||||||
|
@ -194,19 +184,6 @@ public:
|
||||||
return m_forcedrtcpport;
|
return m_forcedrtcpport;
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Returns true and fills in sockets if existing sockets were set
|
|
||||||
* using RTPUDPv4TransmissionParams::SetUseExistingSockets. */
|
|
||||||
bool GetUseExistingSockets(QUdpSocket **rtpsocket, QUdpSocket **rtcpsocket) const
|
|
||||||
{
|
|
||||||
if (!m_useexistingsockets) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
*rtpsocket = m_rtpsock;
|
|
||||||
*rtcpsocket = m_rtcpsock;
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
private:
|
private:
|
||||||
QHostAddress m_bindAddress;
|
QHostAddress m_bindAddress;
|
||||||
QNetworkInterface m_mcastInterface;
|
QNetworkInterface m_mcastInterface;
|
||||||
|
@ -218,7 +195,6 @@ private:
|
||||||
uint16_t m_forcedrtcpport;
|
uint16_t m_forcedrtcpport;
|
||||||
|
|
||||||
QUdpSocket *m_rtpsock, *m_rtcpsock;
|
QUdpSocket *m_rtpsock, *m_rtcpsock;
|
||||||
bool m_useexistingsockets;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
inline RTPUDPTransmissionParams::RTPUDPTransmissionParams() :
|
inline RTPUDPTransmissionParams::RTPUDPTransmissionParams() :
|
||||||
|
@ -232,7 +208,6 @@ inline RTPUDPTransmissionParams::RTPUDPTransmissionParams() :
|
||||||
m_rtcpmux = false;
|
m_rtcpmux = false;
|
||||||
m_allowoddportbase = false;
|
m_allowoddportbase = false;
|
||||||
m_forcedrtcpport = 0;
|
m_forcedrtcpport = 0;
|
||||||
m_useexistingsockets = false;
|
|
||||||
m_rtpsock = 0;
|
m_rtpsock = 0;
|
||||||
m_rtcpsock = 0;
|
m_rtcpsock = 0;
|
||||||
}
|
}
|
||||||
|
@ -360,8 +335,6 @@ private:
|
||||||
QQueue<RTPRawPacket*> m_rawPacketQueue;
|
QQueue<RTPRawPacket*> m_rawPacketQueue;
|
||||||
QMutex m_rawPacketQueueLock;
|
QMutex m_rawPacketQueueLock;
|
||||||
|
|
||||||
bool m_closesocketswhendone;
|
|
||||||
|
|
||||||
bool ShouldAcceptData(const RTPAddress& address);
|
bool ShouldAcceptData(const RTPAddress& address);
|
||||||
|
|
||||||
private slots:
|
private slots:
|
||||||
|
|
|
@ -65,6 +65,7 @@ set(sdrbase_SOURCES
|
||||||
util/message.cpp
|
util/message.cpp
|
||||||
util/messagequeue.cpp
|
util/messagequeue.cpp
|
||||||
util/prettyprint.cpp
|
util/prettyprint.cpp
|
||||||
|
util/rtpsink.cpp
|
||||||
util/syncmessenger.cpp
|
util/syncmessenger.cpp
|
||||||
util/samplesourceserializer.cpp
|
util/samplesourceserializer.cpp
|
||||||
util/simpleserializer.cpp
|
util/simpleserializer.cpp
|
||||||
|
@ -173,6 +174,7 @@ set(sdrbase_HEADERS
|
||||||
util/messagequeue.h
|
util/messagequeue.h
|
||||||
util/movingaverage.h
|
util/movingaverage.h
|
||||||
util/prettyprint.h
|
util/prettyprint.h
|
||||||
|
util/rtpsink.h
|
||||||
util/syncmessenger.h
|
util/syncmessenger.h
|
||||||
util/samplesourceserializer.h
|
util/samplesourceserializer.h
|
||||||
util/simpleserializer.h
|
util/simpleserializer.h
|
||||||
|
@ -215,19 +217,6 @@ else(FFTW3F_FOUND)
|
||||||
add_definitions(-DUSE_KISSFFT)
|
add_definitions(-DUSE_KISSFFT)
|
||||||
endif(FFTW3F_FOUND)
|
endif(FFTW3F_FOUND)
|
||||||
|
|
||||||
if (JRTPLIB_FOUND)
|
|
||||||
set(sdrbase_HEADERS
|
|
||||||
${sdrbase_HEADERS}
|
|
||||||
util/rtpsink.h
|
|
||||||
)
|
|
||||||
set(sdrbase_SOURCES
|
|
||||||
${sdrbase_SOURCES}
|
|
||||||
util/rtpsink.cpp
|
|
||||||
)
|
|
||||||
add_definitions(-DHAS_JRTPLIB)
|
|
||||||
include_directories(${JRTPLIB_INCLUDE_DIR})
|
|
||||||
endif(JRTPLIB_FOUND)
|
|
||||||
|
|
||||||
if (LIBSERIALDV_FOUND)
|
if (LIBSERIALDV_FOUND)
|
||||||
set(sdrbase_SOURCES
|
set(sdrbase_SOURCES
|
||||||
${sdrbase_SOURCES}
|
${sdrbase_SOURCES}
|
||||||
|
@ -270,12 +259,14 @@ include_directories(
|
||||||
${CMAKE_CURRENT_BINARY_DIR}
|
${CMAKE_CURRENT_BINARY_DIR}
|
||||||
.
|
.
|
||||||
${CMAKE_SOURCE_DIR}/httpserver
|
${CMAKE_SOURCE_DIR}/httpserver
|
||||||
|
${CMAKE_SOURCE_DIR}/qrtplib
|
||||||
${CMAKE_SOURCE_DIR}/swagger/sdrangel/code/qt5/client
|
${CMAKE_SOURCE_DIR}/swagger/sdrangel/code/qt5/client
|
||||||
)
|
)
|
||||||
|
|
||||||
target_link_libraries(sdrbase
|
target_link_libraries(sdrbase
|
||||||
${QT_LIBRARIES}
|
${QT_LIBRARIES}
|
||||||
httpserver
|
httpserver
|
||||||
|
qrtplib
|
||||||
swagger
|
swagger
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -283,10 +274,6 @@ if(FFTW3F_FOUND)
|
||||||
target_link_libraries(sdrbase ${FFTW3F_LIBRARIES})
|
target_link_libraries(sdrbase ${FFTW3F_LIBRARIES})
|
||||||
endif(FFTW3F_FOUND)
|
endif(FFTW3F_FOUND)
|
||||||
|
|
||||||
if (JRTPLIB_FOUND)
|
|
||||||
target_link_libraries(sdrbase ${JRTPLIB_LIBRARIES})
|
|
||||||
endif(JRTPLIB_FOUND)
|
|
||||||
|
|
||||||
if(LIBSERIALDV_FOUND)
|
if(LIBSERIALDV_FOUND)
|
||||||
target_link_libraries(sdrbase ${LIBSERIALDV_LIBRARY})
|
target_link_libraries(sdrbase ${LIBSERIALDV_LIBRARY})
|
||||||
endif(LIBSERIALDV_FOUND)
|
endif(LIBSERIALDV_FOUND)
|
||||||
|
|
|
@ -17,9 +17,7 @@
|
||||||
|
|
||||||
#include "audionetsink.h"
|
#include "audionetsink.h"
|
||||||
#include "util/udpsink.h"
|
#include "util/udpsink.h"
|
||||||
#ifdef HAS_JRTPLIB
|
|
||||||
#include "util/rtpsink.h"
|
#include "util/rtpsink.h"
|
||||||
#endif
|
|
||||||
|
|
||||||
const int AudioNetSink::m_udpBlockSize = 512;
|
const int AudioNetSink::m_udpBlockSize = 512;
|
||||||
|
|
||||||
|
@ -35,9 +33,7 @@ AudioNetSink::AudioNetSink(QObject *parent, bool stereo) :
|
||||||
m_udpBufferAudioMono = new UDPSink<int16_t>(parent, m_udpBlockSize);
|
m_udpBufferAudioMono = new UDPSink<int16_t>(parent, m_udpBlockSize);
|
||||||
}
|
}
|
||||||
|
|
||||||
#ifdef HAS_JRTPLIB
|
|
||||||
m_rtpBufferAudio = new RTPSink("127.0.0.1", 9999, stereo ? RTPSink::PayloadL16Stereo : RTPSink::PayloadL16Mono);
|
m_rtpBufferAudio = new RTPSink("127.0.0.1", 9999, stereo ? RTPSink::PayloadL16Stereo : RTPSink::PayloadL16Mono);
|
||||||
#endif
|
|
||||||
}
|
}
|
||||||
|
|
||||||
AudioNetSink::~AudioNetSink()
|
AudioNetSink::~AudioNetSink()
|
||||||
|
@ -49,20 +45,14 @@ AudioNetSink::~AudioNetSink()
|
||||||
if (m_udpBufferAudioStereo) {
|
if (m_udpBufferAudioStereo) {
|
||||||
delete m_udpBufferAudioStereo;
|
delete m_udpBufferAudioStereo;
|
||||||
}
|
}
|
||||||
#ifdef HAS_JRTPLIB
|
|
||||||
if (m_rtpBufferAudio) {
|
if (m_rtpBufferAudio) {
|
||||||
delete m_rtpBufferAudio;
|
delete m_rtpBufferAudio;
|
||||||
}
|
}
|
||||||
#endif
|
|
||||||
}
|
}
|
||||||
|
|
||||||
bool AudioNetSink::isRTPCapable() const
|
bool AudioNetSink::isRTPCapable() const
|
||||||
{
|
{
|
||||||
#ifdef HAS_JRTPLIB
|
|
||||||
return m_rtpBufferAudio->isValid();
|
return m_rtpBufferAudio->isValid();
|
||||||
#else
|
|
||||||
return false;
|
|
||||||
#endif
|
|
||||||
}
|
}
|
||||||
|
|
||||||
bool AudioNetSink::selectType(SinkType type)
|
bool AudioNetSink::selectType(SinkType type)
|
||||||
|
@ -74,13 +64,8 @@ bool AudioNetSink::selectType(SinkType type)
|
||||||
}
|
}
|
||||||
else if (type == SinkRTP)
|
else if (type == SinkRTP)
|
||||||
{
|
{
|
||||||
#ifdef HAS_JRTPLIB
|
|
||||||
m_type = SinkRTP;
|
m_type = SinkRTP;
|
||||||
return true;
|
return true;
|
||||||
#else
|
|
||||||
m_type = SinkUDP;
|
|
||||||
return false;
|
|
||||||
#endif
|
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
@ -96,39 +81,24 @@ void AudioNetSink::setDestination(const QString& address, uint16_t port)
|
||||||
if (m_udpBufferAudioStereo) {
|
if (m_udpBufferAudioStereo) {
|
||||||
m_udpBufferAudioStereo->setDestination(address, port);
|
m_udpBufferAudioStereo->setDestination(address, port);
|
||||||
}
|
}
|
||||||
|
|
||||||
#ifdef HAS_JRTPLIB
|
|
||||||
if (m_rtpBufferAudio) {
|
if (m_rtpBufferAudio) {
|
||||||
m_rtpBufferAudio->setDestination(address, port);
|
m_rtpBufferAudio->setDestination(address, port);
|
||||||
}
|
}
|
||||||
#endif
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#ifdef HAS_JRTPLIB
|
|
||||||
void AudioNetSink::addDestination(const QString& address, uint16_t port)
|
void AudioNetSink::addDestination(const QString& address, uint16_t port)
|
||||||
{
|
{
|
||||||
if (m_rtpBufferAudio) {
|
if (m_rtpBufferAudio) {
|
||||||
m_rtpBufferAudio->addDestination(address, port);
|
m_rtpBufferAudio->addDestination(address, port);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
#else
|
|
||||||
void AudioNetSink::addDestination(const QString& address __attribute__((unused)), uint16_t port __attribute__((unused)))
|
|
||||||
{
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
#ifdef HAS_JRTPLIB
|
|
||||||
void AudioNetSink::deleteDestination(const QString& address, uint16_t port)
|
void AudioNetSink::deleteDestination(const QString& address, uint16_t port)
|
||||||
{
|
{
|
||||||
if (m_rtpBufferAudio) {
|
if (m_rtpBufferAudio) {
|
||||||
m_rtpBufferAudio->deleteDestination(address, port);
|
m_rtpBufferAudio->deleteDestination(address, port);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
#else
|
|
||||||
void AudioNetSink::deleteDestination(const QString& address __attribute__((unused)), uint16_t port __attribute__((unused)))
|
|
||||||
{
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
void AudioNetSink::write(qint16 sample)
|
void AudioNetSink::write(qint16 sample)
|
||||||
{
|
{
|
||||||
|
@ -139,9 +109,7 @@ void AudioNetSink::write(qint16 sample)
|
||||||
if (m_type == SinkUDP) {
|
if (m_type == SinkUDP) {
|
||||||
m_udpBufferAudioMono->write(sample);
|
m_udpBufferAudioMono->write(sample);
|
||||||
} else if (m_type == SinkRTP) {
|
} else if (m_type == SinkRTP) {
|
||||||
#ifdef HAS_JRTPLIB
|
|
||||||
m_rtpBufferAudio->write((uint8_t *) &sample);
|
m_rtpBufferAudio->write((uint8_t *) &sample);
|
||||||
#endif
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -154,10 +122,18 @@ void AudioNetSink::write(const AudioSample& sample)
|
||||||
if (m_type == SinkUDP) {
|
if (m_type == SinkUDP) {
|
||||||
m_udpBufferAudioStereo->write(sample);
|
m_udpBufferAudioStereo->write(sample);
|
||||||
} else if (m_type == SinkRTP) {
|
} else if (m_type == SinkRTP) {
|
||||||
#ifdef HAS_JRTPLIB
|
|
||||||
m_rtpBufferAudio->write((uint8_t *) &sample);
|
m_rtpBufferAudio->write((uint8_t *) &sample);
|
||||||
#endif
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void AudioNetSink::moveToThread(QThread *thread)
|
||||||
|
{
|
||||||
|
if (m_udpBufferAudioMono) {
|
||||||
|
m_udpBufferAudioMono->moveToThread(thread);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (m_udpBufferAudioStereo) {
|
||||||
|
m_udpBufferAudioMono->moveToThread(thread);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
|
@ -24,6 +24,7 @@
|
||||||
|
|
||||||
template<typename T> class UDPSink;
|
template<typename T> class UDPSink;
|
||||||
class RTPSink;
|
class RTPSink;
|
||||||
|
class QThread;
|
||||||
|
|
||||||
class SDRBASE_API AudioNetSink {
|
class SDRBASE_API AudioNetSink {
|
||||||
public:
|
public:
|
||||||
|
@ -46,6 +47,8 @@ public:
|
||||||
bool isRTPCapable() const;
|
bool isRTPCapable() const;
|
||||||
bool selectType(SinkType type);
|
bool selectType(SinkType type);
|
||||||
|
|
||||||
|
void moveToThread(QThread *thread);
|
||||||
|
|
||||||
static const int m_udpBlockSize;
|
static const int m_udpBlockSize;
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
#include <dsp/basebandsamplesink.h>
|
#include "basebandsamplesink.h"
|
||||||
#include "util/message.h"
|
|
||||||
|
MESSAGE_CLASS_DEFINITION(BasebandSampleSink::MsgThreadedSink, Message)
|
||||||
|
|
||||||
BasebandSampleSink::BasebandSampleSink() :
|
BasebandSampleSink::BasebandSampleSink() :
|
||||||
m_guiMessageQueue(0)
|
m_guiMessageQueue(0)
|
||||||
|
|
|
@ -22,12 +22,34 @@
|
||||||
#include "dsp/dsptypes.h"
|
#include "dsp/dsptypes.h"
|
||||||
#include "util/export.h"
|
#include "util/export.h"
|
||||||
#include "util/messagequeue.h"
|
#include "util/messagequeue.h"
|
||||||
|
#include "util/message.h"
|
||||||
|
|
||||||
class Message;
|
class Message;
|
||||||
|
|
||||||
class SDRBASE_API BasebandSampleSink : public QObject {
|
class SDRBASE_API BasebandSampleSink : public QObject {
|
||||||
Q_OBJECT
|
Q_OBJECT
|
||||||
public:
|
public:
|
||||||
|
/** Used to notify on which thread the sample sink is now running (with ThreadedSampleSink) */
|
||||||
|
class MsgThreadedSink : public Message {
|
||||||
|
MESSAGE_CLASS_DECLARATION
|
||||||
|
|
||||||
|
public:
|
||||||
|
const QThread* getThread() const { return m_thread; }
|
||||||
|
|
||||||
|
static MsgThreadedSink* create(const QThread* thread)
|
||||||
|
{
|
||||||
|
return new MsgThreadedSink(thread);
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
const QThread *m_thread;
|
||||||
|
|
||||||
|
MsgThreadedSink(const QThread *thread) :
|
||||||
|
Message(),
|
||||||
|
m_thread(thread)
|
||||||
|
{ }
|
||||||
|
};
|
||||||
|
|
||||||
BasebandSampleSink();
|
BasebandSampleSink();
|
||||||
virtual ~BasebandSampleSink();
|
virtual ~BasebandSampleSink();
|
||||||
|
|
||||||
|
|
|
@ -144,6 +144,10 @@ bool DownChannelizer::handleMessage(const Message& cmd)
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
else if (BasebandSampleSink::MsgThreadedSink::match(cmd))
|
||||||
|
{
|
||||||
|
return m_sampleSink->handleMessage(cmd); // this message is passed to the demod
|
||||||
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
return false;
|
return false;
|
||||||
|
|
|
@ -76,6 +76,10 @@ ThreadedBasebandSampleSink::ThreadedBasebandSampleSink(BasebandSampleSink* sampl
|
||||||
//moveToThread(m_thread); // FIXME: Fixed? the intermediate FIFO should be handled within the sink. Define a new type of sink that is compatible with threading
|
//moveToThread(m_thread); // FIXME: Fixed? the intermediate FIFO should be handled within the sink. Define a new type of sink that is compatible with threading
|
||||||
m_basebandSampleSink->moveToThread(m_thread);
|
m_basebandSampleSink->moveToThread(m_thread);
|
||||||
m_threadedBasebandSampleSinkFifo->moveToThread(m_thread);
|
m_threadedBasebandSampleSinkFifo->moveToThread(m_thread);
|
||||||
|
BasebandSampleSink::MsgThreadedSink *msg = BasebandSampleSink::MsgThreadedSink::create(m_thread); // inform of the new thread
|
||||||
|
if (!m_basebandSampleSink->handleMessage(*msg)) {
|
||||||
|
delete msg;
|
||||||
|
}
|
||||||
//m_sampleFifo.moveToThread(m_thread);
|
//m_sampleFifo.moveToThread(m_thread);
|
||||||
//connect(&m_sampleFifo, SIGNAL(dataReady()), this, SLOT(handleData()));
|
//connect(&m_sampleFifo, SIGNAL(dataReady()), this, SLOT(handleData()));
|
||||||
//m_sampleFifo.setSize(262144);
|
//m_sampleFifo.setSize(262144);
|
||||||
|
|
|
@ -66,6 +66,7 @@ public:
|
||||||
void feed(SampleVector::const_iterator begin, SampleVector::const_iterator end, bool positiveOnly); //!< Feed sink with samples
|
void feed(SampleVector::const_iterator begin, SampleVector::const_iterator end, bool positiveOnly); //!< Feed sink with samples
|
||||||
|
|
||||||
QString getSampleSinkObjectName() const;
|
QString getSampleSinkObjectName() const;
|
||||||
|
const QThread *getThread() const { return m_thread; }
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
|
|
||||||
|
|
|
@ -28,35 +28,28 @@ RTPSink::RTPSink(const QString& address, uint16_t port, PayloadType payloadType)
|
||||||
m_sampleBufferIndex(0),
|
m_sampleBufferIndex(0),
|
||||||
m_byteBuffer(0),
|
m_byteBuffer(0),
|
||||||
m_destport(port),
|
m_destport(port),
|
||||||
m_rtpTransmitter(&m_rtpMemoryManager),
|
|
||||||
m_mutex(QMutex::Recursive)
|
m_mutex(QMutex::Recursive)
|
||||||
{
|
{
|
||||||
// Here we use JRTPLIB in a bit funny way since we do not want the socket to bind because we are only sending
|
|
||||||
// data to a remote party and we don't want to waste a port on the local machine for each possible connection that may not be used.
|
|
||||||
// Therefore we create a socket and assign it through the SetUseExistingSockets method of the RTPUDPv4TransmissionParams object
|
|
||||||
// By doing this the socket is left unbound but sending RTP packets with the library is still possible. Other functions may
|
|
||||||
// not work but we don't care
|
|
||||||
|
|
||||||
m_rtpSessionParams.SetOwnTimestampUnit(1.0 / (double) m_sampleRate);
|
m_rtpSessionParams.SetOwnTimestampUnit(1.0 / (double) m_sampleRate);
|
||||||
m_rtpTransmissionParams.SetRTCPMultiplexing(true); // do not allocate another socket for RTCP
|
m_rtpTransmissionParams.SetRTCPMultiplexing(true); // do not allocate another socket for RTCP
|
||||||
|
|
||||||
int status = m_rtpTransmitter.Init(false);
|
int status = m_rtpTransmitter.Init();
|
||||||
if (status < 0) {
|
if (status < 0) {
|
||||||
qCritical("RTPSink::RTPSink: cannot initialize transmitter: %s", jrtplib::RTPGetErrorString(status).c_str());
|
qCritical("RTPSink::RTPSink: cannot initialize transmitter: %s", qrtplib::RTPGetErrorString(status).c_str());
|
||||||
m_valid = false;
|
m_valid = false;
|
||||||
} else {
|
} else {
|
||||||
qDebug("RTPSink::RTPSink: initialized transmitter: %s", jrtplib::RTPGetErrorString(status).c_str());
|
qDebug("RTPSink::RTPSink: initialized transmitter: %s", qrtplib::RTPGetErrorString(status).c_str());
|
||||||
}
|
}
|
||||||
|
|
||||||
m_rtpTransmitter.Create(m_rtpSessionParams.GetMaximumPacketSize(), &m_rtpTransmissionParams);
|
m_rtpTransmitter.Create(m_rtpSessionParams.GetMaximumPacketSize(), &m_rtpTransmissionParams);
|
||||||
qDebug("RTPSink::RTPSink: created transmitter: %s", jrtplib::RTPGetErrorString(status).c_str());
|
qDebug("RTPSink::RTPSink: created transmitter: %s", qrtplib::RTPGetErrorString(status).c_str());
|
||||||
|
|
||||||
status = m_rtpSession.Create(m_rtpSessionParams, &m_rtpTransmitter);
|
status = m_rtpSession.Create(m_rtpSessionParams, &m_rtpTransmitter);
|
||||||
if (status < 0) {
|
if (status < 0) {
|
||||||
qCritical("RTPSink::RTPSink: cannot create session: %s", jrtplib::RTPGetErrorString(status).c_str());
|
qCritical("RTPSink::RTPSink: cannot create session: %s", qrtplib::RTPGetErrorString(status).c_str());
|
||||||
m_valid = false;
|
m_valid = false;
|
||||||
} else {
|
} else {
|
||||||
qDebug("RTPSink::RTPSink: created session: %s", jrtplib::RTPGetErrorString(status).c_str());
|
qDebug("RTPSink::RTPSink: created session: %s", qrtplib::RTPGetErrorString(status).c_str());
|
||||||
}
|
}
|
||||||
|
|
||||||
setPayloadType(payloadType);
|
setPayloadType(payloadType);
|
||||||
|
@ -66,14 +59,12 @@ RTPSink::RTPSink(const QString& address, uint16_t port, PayloadType payloadType)
|
||||||
uint8_t *ptr = (uint8_t*) &endianTest32;
|
uint8_t *ptr = (uint8_t*) &endianTest32;
|
||||||
m_endianReverse = (*ptr == 1);
|
m_endianReverse = (*ptr == 1);
|
||||||
|
|
||||||
m_destip = inet_addr(address.toStdString().c_str());
|
m_destip.setAddress(address);
|
||||||
m_destip = ntohl(m_destip);
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
RTPSink::~RTPSink()
|
RTPSink::~RTPSink()
|
||||||
{
|
{
|
||||||
jrtplib::RTPTime delay = jrtplib::RTPTime(10.0);
|
qrtplib::RTPTime delay = qrtplib::RTPTime(10.0);
|
||||||
m_rtpSession.BYEDestroy(delay, "Time's up", 9);
|
m_rtpSession.BYEDestroy(delay, "Time's up", 9);
|
||||||
|
|
||||||
if (m_byteBuffer) {
|
if (m_byteBuffer) {
|
||||||
|
@ -116,79 +107,76 @@ void RTPSink::setPayloadType(PayloadType payloadType)
|
||||||
int status = m_rtpSession.SetTimestampUnit(1.0 / (double) m_sampleRate);
|
int status = m_rtpSession.SetTimestampUnit(1.0 / (double) m_sampleRate);
|
||||||
|
|
||||||
if (status < 0) {
|
if (status < 0) {
|
||||||
qCritical("RTPSink::setPayloadType: cannot set timestamp unit: %s", jrtplib::RTPGetErrorString(status).c_str());
|
qCritical("RTPSink::setPayloadType: cannot set timestamp unit: %s", qrtplib::RTPGetErrorString(status).c_str());
|
||||||
} else {
|
} else {
|
||||||
qDebug("RTPSink::setPayloadType: timestamp unit set to %f: %s",
|
qDebug("RTPSink::setPayloadType: timestamp unit set to %f: %s",
|
||||||
1.0 / (double) m_sampleRate,
|
1.0 / (double) m_sampleRate,
|
||||||
jrtplib::RTPGetErrorString(status).c_str());
|
qrtplib::RTPGetErrorString(status).c_str());
|
||||||
}
|
}
|
||||||
|
|
||||||
status = m_rtpSession.SetDefaultMark(false);
|
status = m_rtpSession.SetDefaultMark(false);
|
||||||
|
|
||||||
if (status < 0) {
|
if (status < 0) {
|
||||||
qCritical("RTPSink::setPayloadType: cannot set default mark: %s", jrtplib::RTPGetErrorString(status).c_str());
|
qCritical("RTPSink::setPayloadType: cannot set default mark: %s", qrtplib::RTPGetErrorString(status).c_str());
|
||||||
} else {
|
} else {
|
||||||
qDebug("RTPSink::setPayloadType: set default mark to false: %s", jrtplib::RTPGetErrorString(status).c_str());
|
qDebug("RTPSink::setPayloadType: set default mark to false: %s", qrtplib::RTPGetErrorString(status).c_str());
|
||||||
}
|
}
|
||||||
|
|
||||||
status = m_rtpSession.SetDefaultTimestampIncrement(m_packetSamples);
|
status = m_rtpSession.SetDefaultTimestampIncrement(m_packetSamples);
|
||||||
|
|
||||||
if (status < 0) {
|
if (status < 0) {
|
||||||
qCritical("RTPSink::setPayloadType: cannot set default timestamp increment: %s", jrtplib::RTPGetErrorString(status).c_str());
|
qCritical("RTPSink::setPayloadType: cannot set default timestamp increment: %s", qrtplib::RTPGetErrorString(status).c_str());
|
||||||
} else {
|
} else {
|
||||||
qDebug("RTPSink::setPayloadType: set default timestamp increment to %d: %s", m_packetSamples, jrtplib::RTPGetErrorString(status).c_str());
|
qDebug("RTPSink::setPayloadType: set default timestamp increment to %d: %s", m_packetSamples, qrtplib::RTPGetErrorString(status).c_str());
|
||||||
}
|
}
|
||||||
|
|
||||||
status = m_rtpSession.SetMaximumPacketSize(m_bufferSize+40);
|
status = m_rtpSession.SetMaximumPacketSize(m_bufferSize+40);
|
||||||
|
|
||||||
if (status < 0) {
|
if (status < 0) {
|
||||||
qCritical("RTPSink::setPayloadType: cannot set maximum packet size: %s", jrtplib::RTPGetErrorString(status).c_str());
|
qCritical("RTPSink::setPayloadType: cannot set maximum packet size: %s", qrtplib::RTPGetErrorString(status).c_str());
|
||||||
} else {
|
} else {
|
||||||
qDebug("RTPSink::setPayloadType: set maximum packet size to %d bytes: %s", m_bufferSize+40, jrtplib::RTPGetErrorString(status).c_str());
|
qDebug("RTPSink::setPayloadType: set maximum packet size to %d bytes: %s", m_bufferSize+40, qrtplib::RTPGetErrorString(status).c_str());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void RTPSink::setDestination(const QString& address, uint16_t port)
|
void RTPSink::setDestination(const QString& address, uint16_t port)
|
||||||
{
|
{
|
||||||
m_rtpSession.ClearDestinations();
|
m_rtpSession.ClearDestinations();
|
||||||
m_rtpSession.DeleteDestination(jrtplib::RTPIPv4Address(m_destip, m_destport));
|
m_rtpSession.DeleteDestination(qrtplib::RTPAddress(m_destip, m_destport));
|
||||||
m_destip = inet_addr(address.toStdString().c_str());
|
m_destip.setAddress(address);
|
||||||
m_destip = ntohl(m_destip);
|
|
||||||
m_destport = port;
|
m_destport = port;
|
||||||
|
|
||||||
int status = m_rtpSession.AddDestination(jrtplib::RTPIPv4Address(m_destip, m_destport));
|
int status = m_rtpSession.AddDestination(qrtplib::RTPAddress(m_destip, m_destport));
|
||||||
|
|
||||||
if (status < 0) {
|
if (status < 0) {
|
||||||
qCritical("RTPSink::setDestination: cannot set destination address: %s", jrtplib::RTPGetErrorString(status).c_str());
|
qCritical("RTPSink::setDestination: cannot set destination address: %s", qrtplib::RTPGetErrorString(status).c_str());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void RTPSink::deleteDestination(const QString& address, uint16_t port)
|
void RTPSink::deleteDestination(const QString& address, uint16_t port)
|
||||||
{
|
{
|
||||||
uint32_t destip = inet_addr(address.toStdString().c_str());
|
QHostAddress destip(address);
|
||||||
destip = ntohl(m_destip);
|
|
||||||
|
|
||||||
int status = m_rtpSession.DeleteDestination(jrtplib::RTPIPv4Address(destip, port));
|
int status = m_rtpSession.DeleteDestination(qrtplib::RTPAddress(destip, port));
|
||||||
|
|
||||||
if (status < 0) {
|
if (status < 0) {
|
||||||
qCritical("RTPSink::deleteDestination: cannot delete destination address: %s", jrtplib::RTPGetErrorString(status).c_str());
|
qCritical("RTPSink::deleteDestination: cannot delete destination address: %s", qrtplib::RTPGetErrorString(status).c_str());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void RTPSink::addDestination(const QString& address, uint16_t port)
|
void RTPSink::addDestination(const QString& address, uint16_t port)
|
||||||
{
|
{
|
||||||
uint32_t destip = inet_addr(address.toStdString().c_str());
|
QHostAddress destip(address);
|
||||||
destip = ntohl(m_destip);
|
|
||||||
|
|
||||||
int status = m_rtpSession.AddDestination(jrtplib::RTPIPv4Address(destip, port));
|
int status = m_rtpSession.AddDestination(qrtplib::RTPAddress(destip, port));
|
||||||
|
|
||||||
if (status < 0) {
|
if (status < 0) {
|
||||||
qCritical("RTPSink::addDestination: cannot add destination address: %s", jrtplib::RTPGetErrorString(status).c_str());
|
qCritical("RTPSink::addDestination: cannot add destination address: %s", qrtplib::RTPGetErrorString(status).c_str());
|
||||||
} else {
|
} else {
|
||||||
qDebug("RTPSink::addDestination: destination address set to %s:%d: %s",
|
qDebug("RTPSink::addDestination: destination address set to %s:%d: %s",
|
||||||
address.toStdString().c_str(),
|
address.toStdString().c_str(),
|
||||||
port,
|
port,
|
||||||
jrtplib::RTPGetErrorString(status).c_str());
|
qrtplib::RTPGetErrorString(status).c_str());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -210,7 +198,7 @@ void RTPSink::write(const uint8_t *sampleByte)
|
||||||
int status = m_rtpSession.SendPacket((const void *) m_byteBuffer, (std::size_t) m_bufferSize);
|
int status = m_rtpSession.SendPacket((const void *) m_byteBuffer, (std::size_t) m_bufferSize);
|
||||||
|
|
||||||
if (status < 0) {
|
if (status < 0) {
|
||||||
qCritical("RTPSink::write: cannot write packet: %s", jrtplib::RTPGetErrorString(status).c_str());
|
qCritical("RTPSink::write: cannot write packet: %s", qrtplib::RTPGetErrorString(status).c_str());
|
||||||
}
|
}
|
||||||
|
|
||||||
writeNetBuf(&m_byteBuffer[0], sampleByte, elemLength(m_payloadType), m_sampleBytes, m_endianReverse);
|
writeNetBuf(&m_byteBuffer[0], sampleByte, elemLength(m_payloadType), m_sampleBytes, m_endianReverse);
|
||||||
|
|
|
@ -21,48 +21,18 @@
|
||||||
#include <QString>
|
#include <QString>
|
||||||
#include <QMutex>
|
#include <QMutex>
|
||||||
#include <QDebug>
|
#include <QDebug>
|
||||||
|
#include <QHostAddress>
|
||||||
#include <stdint.h>
|
#include <stdint.h>
|
||||||
|
|
||||||
// jrtplib includes
|
// qrtplib includes
|
||||||
#include "rtpsession.h"
|
#include "rtpsession.h"
|
||||||
#include "rtpudpv4transmitternobind.h"
|
#include "rtpudptransmitter.h"
|
||||||
#include "rtpipv4address.h"
|
#include "rtpaddress.h"
|
||||||
#include "rtpsessionparams.h"
|
#include "rtpsessionparams.h"
|
||||||
#include "rtperrors.h"
|
#include "rtperrors.h"
|
||||||
#include "rtplibraryversion.h"
|
|
||||||
|
|
||||||
#include "util/export.h"
|
#include "util/export.h"
|
||||||
|
|
||||||
class SDRBASE_API RTPSinkMemoryManager : public jrtplib::RTPMemoryManager
|
|
||||||
{
|
|
||||||
public:
|
|
||||||
RTPSinkMemoryManager()
|
|
||||||
{
|
|
||||||
alloccount = 0;
|
|
||||||
freecount = 0;
|
|
||||||
}
|
|
||||||
~RTPSinkMemoryManager()
|
|
||||||
{
|
|
||||||
qDebug() << "RTPSinkMemoryManager::~RTPSinkMemoryManager: alloc: " << alloccount << " free: " << freecount;
|
|
||||||
}
|
|
||||||
void *AllocateBuffer(size_t numbytes, int memtype)
|
|
||||||
{
|
|
||||||
void *buf = malloc(numbytes);
|
|
||||||
qDebug() << "RTPSinkMemoryManager::AllocateBuffer: Allocated " << numbytes << " bytes at location " << buf << " (memtype = " << memtype << ")";
|
|
||||||
alloccount++;
|
|
||||||
return buf;
|
|
||||||
}
|
|
||||||
|
|
||||||
void FreeBuffer(void *p)
|
|
||||||
{
|
|
||||||
qDebug() << "RTPSinkMemoryManager::FreeBuffer: Freeing block " << p;
|
|
||||||
freecount++;
|
|
||||||
free(p);
|
|
||||||
}
|
|
||||||
private:
|
|
||||||
int alloccount,freecount;
|
|
||||||
};
|
|
||||||
|
|
||||||
class RTPSink
|
class RTPSink
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
|
@ -98,13 +68,12 @@ protected:
|
||||||
int m_bufferSize;
|
int m_bufferSize;
|
||||||
int m_sampleBufferIndex;
|
int m_sampleBufferIndex;
|
||||||
uint8_t *m_byteBuffer;
|
uint8_t *m_byteBuffer;
|
||||||
uint32_t m_destip;
|
QHostAddress m_destip;
|
||||||
uint16_t m_destport;
|
uint16_t m_destport;
|
||||||
jrtplib::RTPSession m_rtpSession;
|
qrtplib::RTPSession m_rtpSession;
|
||||||
jrtplib::RTPSessionParams m_rtpSessionParams;
|
qrtplib::RTPSessionParams m_rtpSessionParams;
|
||||||
jrtplib::RTPUDPv4TransmissionNoBindParams m_rtpTransmissionParams;
|
qrtplib::RTPUDPTransmissionParams m_rtpTransmissionParams;
|
||||||
jrtplib::RTPUDPv4TransmitterNoBind m_rtpTransmitter;
|
qrtplib::RTPUDPTransmitter m_rtpTransmitter;
|
||||||
RTPSinkMemoryManager m_rtpMemoryManager;
|
|
||||||
bool m_endianReverse;
|
bool m_endianReverse;
|
||||||
QMutex m_mutex;
|
QMutex m_mutex;
|
||||||
};
|
};
|
||||||
|
|
|
@ -71,6 +71,11 @@ public:
|
||||||
delete m_socket;
|
delete m_socket;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void moveToThread(QThread *thread)
|
||||||
|
{
|
||||||
|
m_socket->moveToThread(thread);
|
||||||
|
}
|
||||||
|
|
||||||
void setAddress(QString& address) { m_address.setAddress(address); }
|
void setAddress(QString& address) { m_address.setAddress(address); }
|
||||||
void setPort(unsigned int port) { m_port = port; }
|
void setPort(unsigned int port) { m_port = port; }
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue