diff --git a/sdrbase/ambe/ambeengine.cpp b/sdrbase/ambe/ambeengine.cpp index b5b6db954..ef93ac9a6 100644 --- a/sdrbase/ambe/ambeengine.cpp +++ b/sdrbase/ambe/ambeengine.cpp @@ -198,16 +198,16 @@ bool AMBEEngine::scan(std::vector& ambeDevices) } } -bool AMBEEngine::registerController(const std::string& ambeRef) +bool AMBEEngine::registerController(const std::string& deviceRef) { AMBEWorker *worker = new AMBEWorker(); - if (worker->open(ambeRef)) + if (worker->open(deviceRef)) { m_controllers.push_back(AMBEController()); m_controllers.back().worker = worker; m_controllers.back().thread = new QThread(); - m_controllers.back().device = ambeRef; + m_controllers.back().device = deviceRef; m_controllers.back().worker->moveToThread(m_controllers.back().thread); connect(m_controllers.back().worker, SIGNAL(finished()), m_controllers.back().thread, SLOT(quit())); @@ -220,4 +220,101 @@ bool AMBEEngine::registerController(const std::string& ambeRef) } return false; -} \ No newline at end of file +} + +void AMBEEngine::releaseController(const std::string& deviceRef) +{ + qDebug("AMBEEngine::releaseController"); + std::vector::iterator it = m_controllers.begin(); + + while (it != m_controllers.end()) + { + if (it->device == deviceRef) + { + disconnect(&it->worker->m_inputMessageQueue, SIGNAL(messageEnqueued()), it->worker, SLOT(handleInputMessages())); + it->worker->stop(); + it->thread->wait(100); + it->worker->m_inputMessageQueue.clear(); + it->worker->close(); + qDebug() << "DVSerialEngine::releaseController: closed device at: " << it->device.c_str(); + break; + } + + ++it; + } +} + +void AMBEEngine::releaseAll() +{ + qDebug("AMBEEngine::releaseAll"); + std::vector::iterator it = m_controllers.begin(); + + while (it != m_controllers.end()) + { + disconnect(&it->worker->m_inputMessageQueue, SIGNAL(messageEnqueued()), it->worker, SLOT(handleInputMessages())); + it->worker->stop(); + it->thread->wait(100); + it->worker->m_inputMessageQueue.clear(); + it->worker->close(); + qDebug() << "DVSerialEngine::release: closed device at: " << it->device.c_str(); + ++it; + } + + m_controllers.clear(); +} + +void AMBEEngine::getDeviceRefs(std::vector& deviceNames) +{ + std::vector::iterator it = m_controllers.begin(); + + while (it != m_controllers.end()) + { + deviceNames.push_back(it->device); + ++it; + } +} + +void AMBEEngine::pushMbeFrame( + const unsigned char *mbeFrame, + int mbeRateIndex, + int mbeVolumeIndex, + unsigned char channels, + bool useLP, + int upsampling, + AudioFifo *audioFifo) +{ + std::vector::iterator it = m_controllers.begin(); + std::vector::iterator itAvail = m_controllers.end(); + bool done = false; + QMutexLocker locker(&m_mutex); + + while (it != m_controllers.end()) + { + if (it->worker->hasFifo(audioFifo)) + { + it->worker->pushMbeFrame(mbeFrame, mbeRateIndex, mbeVolumeIndex, channels, useLP, upsampling, audioFifo); + done = true; + } + else if (it->worker->isAvailable()) + { + itAvail = it; + } + + ++it; + } + + if (!done) + { + if (itAvail != m_controllers.end()) + { + int wNum = itAvail - m_controllers.begin(); + + qDebug("AMBEEngine::pushMbeFrame: push %p on empty queue %d", audioFifo, wNum); + itAvail->worker->pushMbeFrame(mbeFrame, mbeRateIndex, mbeVolumeIndex, channels, useLP, upsampling, audioFifo); + } + else + { + qDebug("AMBEEngine::pushMbeFrame: no DV device available. MBE frame dropped"); + } + } +} diff --git a/sdrbase/ambe/ambeengine.h b/sdrbase/ambe/ambeengine.h index 35c193b7b..44b7cc721 100644 --- a/sdrbase/ambe/ambeengine.h +++ b/sdrbase/ambe/ambeengine.h @@ -29,6 +29,7 @@ class QThread; class AMBEWorker; +class AudioFifo; class SDRBASE_API AMBEEngine : public QObject { @@ -38,8 +39,21 @@ public: ~AMBEEngine(); bool scan(std::vector& ambeDevices); - void getDevicesNames(std::vector& devicesNames); - bool registerController(const std::string& ambeRef); + void releaseAll(); + + int getNbDevices() const { return m_controllers.size(); } //!< number of devices used + void getDeviceRefs(std::vector& devicesRefs); //!< reference of the devices used (device path or url) + bool registerController(const std::string& deviceRef); //!< create a new controller for the device in reference + void releaseController(const std::string& deviceRef); //!< release controller resources for the device in reference + + void pushMbeFrame( + const unsigned char *mbeFrame, + int mbeRateIndex, + int mbeVolumeIndex, + unsigned char channels, + bool useHP, + int upsampling, + AudioFifo *audioFifo); private: struct AMBEController diff --git a/sdrbase/ambe/ambeworker.cpp b/sdrbase/ambe/ambeworker.cpp index 07b84fe7a..59fabca85 100644 --- a/sdrbase/ambe/ambeworker.cpp +++ b/sdrbase/ambe/ambeworker.cpp @@ -16,20 +16,200 @@ // along with this program. If not, see . // /////////////////////////////////////////////////////////////////////////////////// +#include +#include +#include + +#include "audio/audiofifo.h" #include "ambeworker.h" -AMBEWorker::AMBEWorker() -{} +MESSAGE_CLASS_DEFINITION(AMBEWorker::MsgMbeDecode, Message) +MESSAGE_CLASS_DEFINITION(AMBEWorker::MsgTest, Message) + +AMBEWorker::AMBEWorker() : + m_running(false), + m_currentGainIn(0), + m_currentGainOut(0), + m_upsamplerLastValue(0.0f), + m_phase(0), + m_upsampling(1), + m_volume(1.0f) +{ + m_audioBuffer.resize(48000); + m_audioBufferFill = 0; + m_audioFifo = 0; + std::fill(m_dvAudioSamples, m_dvAudioSamples+SerialDV::MBE_AUDIO_BLOCK_SIZE, 0); + setVolumeFactors(); +} AMBEWorker::~AMBEWorker() {} -bool AMBEWorker::open(const std::string& serialDevice) +bool AMBEWorker::open(const std::string& deviceRef) { - return m_dvController.open(serialDevice); + return m_dvController.open(deviceRef); } void AMBEWorker::close() { m_dvController.close(); } + +void AMBEWorker::process() +{ + m_running = true; + qDebug("AMBEWorker::process: started"); + + while (m_running) + { + std::this_thread::sleep_for(std::chrono::seconds(1)); + } + + qDebug("AMBEWorker::process: stopped"); + emit finished(); +} + +void AMBEWorker::stop() +{ + m_running = false; +} + +void AMBEWorker::handleInputMessages() +{ + Message* message; + m_audioBufferFill = 0; + AudioFifo *audioFifo = 0; + + while ((message = m_inputMessageQueue.pop()) != 0) + { + if (MsgMbeDecode::match(*message)) + { + MsgMbeDecode *decodeMsg = (MsgMbeDecode *) message; + int dBVolume = (decodeMsg->getVolumeIndex() - 30) / 4; + float volume = pow(10.0, dBVolume / 10.0f); + int upsampling = decodeMsg->getUpsampling(); + upsampling = upsampling > 6 ? 6 : upsampling < 1 ? 1 : upsampling; + + if ((volume != m_volume) || (upsampling != m_upsampling)) + { + m_volume = volume; + m_upsampling = upsampling; + setVolumeFactors(); + } + + m_upsampleFilter.useHP(decodeMsg->getUseHP()); + + if (m_dvController.decode(m_dvAudioSamples, decodeMsg->getMbeFrame(), decodeMsg->getMbeRate())) + { + if (upsampling > 1) { + upsample(upsampling, m_dvAudioSamples, SerialDV::MBE_AUDIO_BLOCK_SIZE, decodeMsg->getChannels()); + } else { + noUpsample(m_dvAudioSamples, SerialDV::MBE_AUDIO_BLOCK_SIZE, decodeMsg->getChannels()); + } + + audioFifo = decodeMsg->getAudioFifo(); + } + else + { + qDebug("AMBEWorker::handleInputMessages: MsgMbeDecode: decode failed"); + } + } + + delete message; + } + + if (audioFifo) + { + uint res = audioFifo->write((const quint8*)&m_audioBuffer[0], m_audioBufferFill); + + if (res != m_audioBufferFill) + { + qDebug("AMBEWorker::handleInputMessages: %u/%u audio samples written", res, m_audioBufferFill); + } + } + + m_timestamp = QDateTime::currentDateTime(); +} + +void AMBEWorker::pushMbeFrame(const unsigned char *mbeFrame, + int mbeRateIndex, + int mbeVolumeIndex, + unsigned char channels, + bool useHP, + int upsampling, + AudioFifo *audioFifo) +{ + m_audioFifo = audioFifo; + m_inputMessageQueue.push(MsgMbeDecode::create(mbeFrame, mbeRateIndex, mbeVolumeIndex, channels, useHP, upsampling, audioFifo)); +} + +bool AMBEWorker::isAvailable() +{ + if (m_audioFifo == 0) { + return true; + } + + return m_timestamp.time().msecsTo(QDateTime::currentDateTime().time()) > 1000; // 1 second inactivity timeout +} + +bool AMBEWorker::hasFifo(AudioFifo *audioFifo) +{ + return m_audioFifo == audioFifo; +} + +void AMBEWorker::upsample(int upsampling, short *in, int nbSamplesIn, unsigned char channels) +{ + for (int i = 0; i < nbSamplesIn; i++) + { + //float cur = m_upsampleFilter.usesHP() ? m_upsampleFilter.runHP((float) m_compressor.compress(in[i])) : (float) m_compressor.compress(in[i]); + float cur = m_upsampleFilter.usesHP() ? m_upsampleFilter.runHP((float) in[i]) : (float) in[i]; + float prev = m_upsamplerLastValue; + qint16 upsample; + + for (int j = 1; j <= upsampling; j++) + { + upsample = (qint16) m_upsampleFilter.runLP(cur*m_upsamplingFactors[j] + prev*m_upsamplingFactors[upsampling-j]); + m_audioBuffer[m_audioBufferFill].l = channels & 1 ? m_compressor.compress(upsample) : 0; + m_audioBuffer[m_audioBufferFill].r = (channels>>1) & 1 ? m_compressor.compress(upsample) : 0; + + if (m_audioBufferFill < m_audioBuffer.size() - 1) + { + ++m_audioBufferFill; + } + else + { + qDebug("AMBEWorker::upsample6: audio buffer is full check its size"); + } + } + + m_upsamplerLastValue = cur; + } +} + +void AMBEWorker::noUpsample(short *in, int nbSamplesIn, unsigned char channels) +{ + for (int i = 0; i < nbSamplesIn; i++) + { + float cur = m_upsampleFilter.usesHP() ? m_upsampleFilter.runHP((float) in[i]) : (float) in[i]; + m_audioBuffer[m_audioBufferFill].l = channels & 1 ? cur*m_upsamplingFactors[0] : 0; + m_audioBuffer[m_audioBufferFill].r = (channels>>1) & 1 ? cur*m_upsamplingFactors[0] : 0; + + if (m_audioBufferFill < m_audioBuffer.size() - 1) + { + ++m_audioBufferFill; + } + else + { + qDebug("DVSerialWorker::noUpsample: audio buffer is full check its size"); + } + } +} + +void AMBEWorker::setVolumeFactors() +{ + m_upsamplingFactors[0] = m_volume; + + for (int i = 1; i <= m_upsampling; i++) { + m_upsamplingFactors[i] = (i*m_volume) / (float) m_upsampling; + } +} diff --git a/sdrbase/ambe/ambeworker.h b/sdrbase/ambe/ambeworker.h index 7327030a2..e6136c139 100644 --- a/sdrbase/ambe/ambeworker.h +++ b/sdrbase/ambe/ambeworker.h @@ -20,24 +20,137 @@ #define SDRBASE_AMBE_AMBEWORKER_H_ #include +#include +#include -#include "util/messagequeue.h" #include "export.h" #include "dvcontroller.h" +#include "util/messagequeue.h" +#include "util/message.h" +#include "dsp/filtermbe.h" +#include "dsp/dsptypes.h" +#include "audio/audiocompressor.h" + +class AudioFifo; + class SDRBASE_API AMBEWorker : public QObject { Q_OBJECT public: + class MsgTest : public Message + { + MESSAGE_CLASS_DECLARATION + public: + static MsgTest* create() { return new MsgTest(); } + private: + MsgTest() {} + }; + + class MsgMbeDecode : public Message + { + MESSAGE_CLASS_DECLARATION + public: + const unsigned char *getMbeFrame() const { return m_mbeFrame; } + SerialDV::DVRate getMbeRate() const { return m_mbeRate; } + int getVolumeIndex() const { return m_volumeIndex; } + unsigned char getChannels() const { return m_channels % 4; } + bool getUseHP() const { return m_useHP; } + int getUpsampling() const { return m_upsampling; } + AudioFifo *getAudioFifo() { return m_audioFifo; } + + static MsgMbeDecode* create( + const unsigned char *mbeFrame, + int mbeRateIndex, + int volumeIndex, + unsigned char channels, + bool useHP, + int upsampling, + AudioFifo *audioFifo) + { + return new MsgMbeDecode(mbeFrame, (SerialDV::DVRate) mbeRateIndex, volumeIndex, channels, useHP, upsampling, audioFifo); + } + + private: + unsigned char m_mbeFrame[SerialDV::MBE_FRAME_MAX_LENGTH_BYTES]; + SerialDV::DVRate m_mbeRate; + int m_volumeIndex; + unsigned char m_channels; + bool m_useHP; + int m_upsampling; + AudioFifo *m_audioFifo; + + MsgMbeDecode(const unsigned char *mbeFrame, + SerialDV::DVRate mbeRate, + int volumeIndex, + unsigned char channels, + bool useHP, + int upsampling, + AudioFifo *audioFifo) : + Message(), + m_mbeRate(mbeRate), + m_volumeIndex(volumeIndex), + m_channels(channels), + m_useHP(useHP), + m_upsampling(upsampling), + m_audioFifo(audioFifo) + { + memcpy((void *) m_mbeFrame, (const void *) mbeFrame, SerialDV::DVController::getNbMbeBytes(m_mbeRate)); + } + }; + AMBEWorker(); ~AMBEWorker(); - bool open(const std::string& serialDevice); + void pushMbeFrame(const unsigned char *mbeFrame, + int mbeRateIndex, + int mbeVolumeIndex, + unsigned char channels, + bool useHP, + int upsampling, + AudioFifo *audioFifo); + + bool open(const std::string& deviceRef); //!< Either serial device or ip:port void close(); + void process(); + void stop(); + bool isAvailable(); + bool hasFifo(AudioFifo *audioFifo); + + void postTest() + { + //emit inputMessageReady(); + m_inputMessageQueue.push(MsgTest::create()); + } MessageQueue m_inputMessageQueue; //!< Queue for asynchronous inbound communication +signals: + void finished(); + +public slots: + void handleInputMessages(); + private: + void upsample(int upsampling, short *in, int nbSamplesIn, unsigned char channels); + void noUpsample(short *in, int nbSamplesIn, unsigned char channels); + void setVolumeFactors(); + SerialDV::DVController m_dvController; + AudioFifo *m_audioFifo; + QDateTime m_timestamp; + volatile bool m_running; + int m_currentGainIn; + int m_currentGainOut; + short m_dvAudioSamples[SerialDV::MBE_AUDIO_BLOCK_SIZE]; + AudioVector m_audioBuffer; + uint m_audioBufferFill; + float m_upsamplerLastValue; + float m_phase; + MBEAudioInterpolatorFilter m_upsampleFilter; + int m_upsampling; + float m_volume; + float m_upsamplingFactors[7]; + AudioCompressor m_compressor; }; #endif // SDRBASE_AMBE_AMBEWORKER_H_