From b196fbfdea302a67e25325e97adf21b37e700058 Mon Sep 17 00:00:00 2001 From: "Charles J. Cliffe" Date: Thu, 18 Dec 2014 20:11:25 -0500 Subject: [PATCH] Basic mixer for OSX -- multi demod streams working MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit RtAudio can’t open multiple streams, so now opening a new device will start a static audio thread and all other threads will attach/detach their input queues there. --- src/CubicSDR.cpp | 4 + src/CubicSDRDefs.h | 2 +- src/audio/AudioThread.cpp | 212 +++++++++++++++++++++++++---- src/audio/AudioThread.h | 26 +++- src/demod/DemodulatorInstance.cpp | 13 +- src/demod/DemodulatorInstance.h | 10 +- src/demod/DemodulatorPreThread.cpp | 8 +- src/demod/DemodulatorThread.cpp | 15 +- src/sdr/SDRPostThread.cpp | 25 ++-- src/sdr/SDRThread.cpp | 12 +- src/visual/WaterfallCanvas.cpp | 2 +- 11 files changed, 271 insertions(+), 58 deletions(-) diff --git a/src/CubicSDR.cpp b/src/CubicSDR.cpp index 7a35fc1..fc78863 100644 --- a/src/CubicSDR.cpp +++ b/src/CubicSDR.cpp @@ -72,6 +72,10 @@ int CubicSDR::OnExit() { demodMgr.terminateAll(); +#ifdef __APPLE__ + AudioThread::deviceCleanup(); +#endif + delete threadCmdQueueSDR; delete iqVisualQueue; diff --git a/src/CubicSDRDefs.h b/src/CubicSDRDefs.h index 404cb56..d54cf73 100644 --- a/src/CubicSDRDefs.h +++ b/src/CubicSDRDefs.h @@ -1,7 +1,7 @@ #pragma once #ifdef __APPLE__ -#define BUF_SIZE (16384*3) +#define BUF_SIZE (16384*2) #define SRATE 2000000 #else #define BUF_SIZE (16384*4) diff --git a/src/audio/AudioThread.cpp b/src/audio/AudioThread.cpp index d2ee47d..0a31186 100644 --- a/src/audio/AudioThread.cpp +++ b/src/audio/AudioThread.cpp @@ -1,54 +1,165 @@ #include "AudioThread.h" #include "CubicSDRDefs.h" #include +#include #include "DemodulatorThread.h" -AudioThread::AudioThread(AudioThreadInputQueue *inputQueue, DemodulatorThreadCommandQueue* threadQueueNotify) : - inputQueue(inputQueue), terminated(false), audio_queue_ptr(0), underflow_count(0), threadQueueNotify(threadQueueNotify) { +#ifdef __APPLE__ +std::map AudioThread::deviceController; +std::map AudioThread::deviceThread; +#endif +AudioThread::AudioThread(AudioThreadInputQueue *inputQueue, DemodulatorThreadCommandQueue* threadQueueNotify) : + inputQueue(inputQueue), terminated(false), audio_queue_ptr(0), underflow_count(0), threadQueueNotify(threadQueueNotify), gain(1.0), active( + false) { +#ifdef __APPLE__ + boundThreads = new std::vector; +#endif } AudioThread::~AudioThread() { +#ifdef __APPLE__ + delete boundThreads.load(); +#endif +} +#ifdef __APPLE__ +void AudioThread::bindThread(AudioThread *other) { + boundThreads.load()->push_back(other); +} + +void AudioThread::removeThread(AudioThread *other) { + std::vector::iterator i; + i = std::find(boundThreads.load()->begin(), boundThreads.load()->end(), other); + if (i != boundThreads.load()->end()) { + boundThreads.load()->erase(i); + } +} + +void AudioThread::deviceCleanup() { + std::map::iterator i; + + for (i = deviceController.begin(); i != deviceController.end(); i++) { + i->second->terminate(); + } } static int audioCallback(void *outputBuffer, void *inputBuffer, unsigned int nBufferFrames, double streamTime, RtAudioStreamStatus status, void *userData) { AudioThread *src = (AudioThread *) userData; float *out = (float*) outputBuffer; + memset(out, 0, nBufferFrames * 2 * sizeof(float)); if (status) { std::cout << "Audio buffer underflow.." << (src->underflow_count++) << std::endl; } - if (src->audio_queue_ptr == src->currentInput.data.size()) { - if (src->terminated) { - return 1; - } - src->inputQueue->pop(src->currentInput); - src->audio_queue_ptr = 0; + if (!src->boundThreads.load()->empty()) { + src->gain = 1.0 / src->boundThreads.load()->size(); + } else { + return 0; } - for (int i = 0; i < nBufferFrames * 2; i++) { - out[i] = src->currentInput.data[src->audio_queue_ptr]; - src->audio_queue_ptr++; - if (src->audio_queue_ptr == src->currentInput.data.size()) { - if (src->terminated) { - return 1; + for (int j = 0; j < src->boundThreads.load()->size(); j++) { + AudioThread *srcmix = (*(src->boundThreads.load()))[j]; + if (srcmix->terminated || !srcmix->inputQueue || srcmix->inputQueue->empty() || !srcmix->isActive()) { + continue; + } + + for (int i = 0; i < nBufferFrames * 2; i++) { + if (srcmix->audio_queue_ptr >= srcmix->currentInput.data.size()) { + srcmix->inputQueue->pop(srcmix->currentInput); + srcmix->audio_queue_ptr = 0; } - src->inputQueue->pop(src->currentInput); - src->audio_queue_ptr = 0; + out[i] = out[i] + srcmix->currentInput.data[srcmix->audio_queue_ptr] * src->gain; + srcmix->audio_queue_ptr++; } } return 0; } +#else + +static int audioCallback(void *outputBuffer, void *inputBuffer, unsigned int nBufferFrames, double streamTime, RtAudioStreamStatus status, + void *userData) { + AudioThread *src = (AudioThread *) userData; + float *out = (float*) outputBuffer; + memset(out, 0, nBufferFrames * 2 * sizeof(float)); + if (status) { + std::cout << "Audio buffer underflow.." << (src->underflow_count++) << std::endl; + } + + for (int i = 0; i < nBufferFrames * 2; i++) { + if (src->audio_queue_ptr >= src->currentInput.data.size()) { + if (src->terminated) { + break; + } + src->inputQueue->pop(src->currentInput); + src->audio_queue_ptr = 0; + } + out[i] = src->currentInput.data[src->audio_queue_ptr] * src->gain; + src->audio_queue_ptr++; + } + + return 0; +} +#endif + + +void AudioThread::enumerateDevices() { + int numDevices = dac.getDeviceCount(); + + for (int i = 0; i < numDevices; i++) { + RtAudio::DeviceInfo info = dac.getDeviceInfo(i); + + std::cout << std::endl; + + std::cout << "Audio Device #" << i << " " << info.name << std::endl; + std::cout << "\tDefault Output? " << (info.isDefaultOutput ? "Yes" : "No") << std::endl; + std::cout << "\tDefault Input? " << (info.isDefaultOutput ? "Yes" : "No") << std::endl; + std::cout << "\tInput channels: " << info.inputChannels << std::endl; + std::cout << "\tOutput channels: " << info.outputChannels << std::endl; + std::cout << "\tDuplex channels: " << info.duplexChannels << std::endl; + + std::cout << "\t" << "Native formats:" << std::endl; + RtAudioFormat nFormats = info.nativeFormats; + if (nFormats & RTAUDIO_SINT8) { + std::cout << "\t\t8-bit signed integer." << std::endl; + } + if (nFormats & RTAUDIO_SINT16) { + std::cout << "\t\t16-bit signed integer." << std::endl; + } + if (nFormats & RTAUDIO_SINT24) { + std::cout << "\t\t24-bit signed integer." << std::endl; + } + if (nFormats & RTAUDIO_SINT32) { + std::cout << "\t\t32-bit signed integer." << std::endl; + } + if (nFormats & RTAUDIO_FLOAT32) { + std::cout << "\t\t32-bit float normalized between plus/minus 1.0." << std::endl; + } + if (nFormats & RTAUDIO_FLOAT64) { + std::cout << "\t\t32-bit float normalized between plus/minus 1.0." << std::endl; + } + + std::vector::iterator srate; + + std::cout << "\t" << "Supported sample rates:" << std::endl; + + for (srate = info.sampleRates.begin(); srate != info.sampleRates.end(); srate++) { + std::cout << "\t\t" << (*srate) << "hz" << std::endl; + } + + std::cout << std::endl; + } +} + void AudioThread::threadMain() { #ifdef __APPLE__ pthread_t tID = pthread_self(); // ID of this thread - int priority = sched_get_priority_min( SCHED_RR ); - sched_param prio = {priority}; // scheduling priority of thread - pthread_setschedparam( tID, SCHED_RR, &prio ); + int priority = sched_get_priority_max( SCHED_RR) - 1; + sched_param prio = { priority }; // scheduling priority of thread + pthread_setschedparam(tID, SCHED_RR, &prio); #endif std::cout << "Audio thread initializing.." << std::endl; @@ -58,7 +169,6 @@ void AudioThread::threadMain() { return; } - RtAudio::StreamParameters parameters; parameters.deviceId = dac.getDefaultOutputDevice(); parameters.nChannels = 2; parameters.firstChannel = 0; @@ -70,11 +180,27 @@ void AudioThread::threadMain() { // | RTAUDIO_MINIMIZE_LATENCY; // opts.flags = RTAUDIO_MINIMIZE_LATENCY; opts.streamName = "CubicSDR Audio Output"; -// opts.priority = sched_get_priority_max(SCHED_FIFO); + opts.priority = sched_get_priority_max(SCHED_FIFO); try { + +#ifdef __APPLE__ + if (deviceController.find(parameters.deviceId) == deviceController.end()) { + deviceController[parameters.deviceId] = new AudioThread(NULL, NULL); + deviceController[parameters.deviceId]->bindThread(this); + deviceThread[parameters.deviceId] = new std::thread(&AudioThread::threadMain, deviceController[parameters.deviceId]); + } else if (deviceController[parameters.deviceId] == this) { + dac.openStream(¶meters, NULL, RTAUDIO_FLOAT32, sampleRate, &bufferFrames, &audioCallback, (void *) this, &opts); + dac.startStream(); + } else { + deviceController[parameters.deviceId]->bindThread(this); + } + active = true; +#else dac.openStream(¶meters, NULL, RTAUDIO_FLOAT32, sampleRate, &bufferFrames, &audioCallback, (void *) this, &opts); dac.startStream(); + +#endif } catch (RtAudioError& e) { e.printMessage(); return; @@ -85,9 +211,22 @@ void AudioThread::threadMain() { cmdQueue.pop(command); } +#ifdef __APPLE__ + if (deviceController[parameters.deviceId] != this) { + deviceController[parameters.deviceId]->removeThread(this); + } else { + try { + dac.stopStream(); + dac.closeStream(); + } catch (RtAudioError& e) { + e.printMessage(); + } + } +#else try { // Stop the stream dac.stopStream(); + dac.closeStream(); } catch (RtAudioError& e) { e.printMessage(); } @@ -95,12 +234,15 @@ void AudioThread::threadMain() { if (dac.isStreamOpen()) { dac.closeStream(); } +#endif std::cout << "Audio thread done." << std::endl; - DemodulatorThreadCommand tCmd(DemodulatorThreadCommand::DEMOD_THREAD_CMD_AUDIO_TERMINATED); - tCmd.context = this; - threadQueueNotify->push(tCmd); + if (threadQueueNotify != NULL) { + DemodulatorThreadCommand tCmd(DemodulatorThreadCommand::DEMOD_THREAD_CMD_AUDIO_TERMINATED); + tCmd.context = this; + threadQueueNotify->push(tCmd); + } } void AudioThread::terminate() { @@ -108,3 +250,25 @@ void AudioThread::terminate() { AudioThreadCommand endCond; // push an empty input to bump the queue cmdQueue.push(endCond); } + +bool AudioThread::isActive() { + return active; +} + +void AudioThread::setActive(bool state) { +#ifdef __APPLE__ + AudioThreadInput dummy; + if (state && !active) { + deviceController[parameters.deviceId]->bindThread(this); + while (!inputQueue->empty()) { // flush queue + inputQueue->pop(dummy); + } + } else if (!state && active) { + deviceController[parameters.deviceId]->removeThread(this); + while (!inputQueue->empty()) { // flush queue + inputQueue->pop(dummy); + } + } +#endif + active = state; +} diff --git a/src/audio/AudioThread.h b/src/audio/AudioThread.h index cef6018..665278c 100644 --- a/src/audio/AudioThread.h +++ b/src/audio/AudioThread.h @@ -2,6 +2,7 @@ #include #include +#include #include #include #include "wx/wxprec.h" @@ -28,11 +29,11 @@ public: class AudioThreadCommand { public: enum AudioThreadCommandEnum { - AUTIO_THREAD_CMD_NULL, AUTIO_THREAD_CMD_SET_DEVICE, + AUDIO_THREAD_CMD_NULL, AUDIO_THREAD_CMD_SET_DEVICE }; AudioThreadCommand() : - cmd(AUTIO_THREAD_CMD_NULL), int_value(0) { + cmd(AUDIO_THREAD_CMD_NULL), int_value(0) { } AudioThreadCommandEnum cmd; @@ -54,12 +55,33 @@ public: AudioThread(AudioThreadInputQueue *inputQueue, DemodulatorThreadCommandQueue* threadQueueNotify); ~AudioThread(); + void enumerateDevices(); + void threadMain(); void terminate(); + bool isActive(); + void setActive(bool state); + +#ifdef __APPLE__ + void bindThread(AudioThread *other); + void removeThread(AudioThread *other); +#endif + private: RtAudio dac; + RtAudio::StreamParameters parameters; AudioThreadCommandQueue cmdQueue; DemodulatorThreadCommandQueue* threadQueueNotify; + +#ifdef __APPLE__ +public: + static std::map deviceController; + static std::map deviceThread; + static void deviceCleanup(); + std::atomic *> boundThreads; + float gain; + std::atomic active; +#endif }; diff --git a/src/demod/DemodulatorInstance.cpp b/src/demod/DemodulatorInstance.cpp index a3ab318..48297de 100644 --- a/src/demod/DemodulatorInstance.cpp +++ b/src/demod/DemodulatorInstance.cpp @@ -2,7 +2,7 @@ DemodulatorInstance::DemodulatorInstance() : t_Demod(NULL), t_PreDemod(NULL), t_Audio(NULL), threadQueueDemod(NULL), demodulatorThread(NULL), terminated(false), audioTerminated(false), demodTerminated( - false), preDemodTerminated(false) { + false), preDemodTerminated(false), active(false) { label = new std::string("Unnamed"); threadQueueDemod = new DemodulatorThreadInputQueue; @@ -42,14 +42,12 @@ void DemodulatorInstance::run() { pthread_attr_init(&attr); pthread_attr_setstacksize(&attr, 2048000); pthread_attr_getstacksize(&attr, &size); - pthread_attr_setschedpolicy(&attr, SCHED_RR); pthread_create(&t_PreDemod, &attr, &DemodulatorPreThread::pthread_helper, demodulatorPreThread); pthread_attr_destroy(&attr); pthread_attr_init(&attr); pthread_attr_setstacksize(&attr, 2048000); pthread_attr_getstacksize(&attr, &size); - pthread_attr_setschedpolicy(&attr, SCHED_RR); pthread_create(&t_Demod, &attr, &DemodulatorThread::pthread_helper, demodulatorThread); pthread_attr_destroy(&attr); @@ -59,6 +57,7 @@ void DemodulatorInstance::run() { t_PreDemod = new std::thread(&DemodulatorPreThread::threadMain, demodulatorPreThread); t_Demod = new std::thread(&DemodulatorThread::threadMain, demodulatorThread); #endif + active = true; } void DemodulatorInstance::updateLabel(int freq) { @@ -137,3 +136,11 @@ bool DemodulatorInstance::isTerminated() { return terminated; } +bool DemodulatorInstance::isActive() { + return active; +} + +void DemodulatorInstance::setActive(bool state) { + active = state; + audioThread->setActive(state); +} diff --git a/src/demod/DemodulatorInstance.h b/src/demod/DemodulatorInstance.h index a43846c..9b759ce 100644 --- a/src/demod/DemodulatorInstance.h +++ b/src/demod/DemodulatorInstance.h @@ -44,7 +44,15 @@ public: bool isTerminated(); void updateLabel(int freq); + bool isActive(); + void setActive(bool state); + private: - std::atomic label;bool terminated;bool demodTerminated;bool audioTerminated;bool preDemodTerminated; + std::atomic label; + bool terminated; + bool demodTerminated; + bool audioTerminated; + bool preDemodTerminated; + std::atomic active; }; diff --git a/src/demod/DemodulatorPreThread.cpp b/src/demod/DemodulatorPreThread.cpp index 6fe202e..b872dc5 100644 --- a/src/demod/DemodulatorPreThread.cpp +++ b/src/demod/DemodulatorPreThread.cpp @@ -88,10 +88,10 @@ void *DemodulatorPreThread::threadMain() { void DemodulatorPreThread::threadMain() { #endif #ifdef __APPLE__ - pthread_t tID = pthread_self(); // ID of this thread - int priority = sched_get_priority_min( SCHED_RR ); - sched_param prio = {priority}; // scheduling priority of thread - pthread_setschedparam( tID, SCHED_RR, &prio ); + pthread_t tID = pthread_self(); // ID of this thread + int priority = sched_get_priority_max( SCHED_FIFO )-1; + sched_param prio = { priority }; // scheduling priority of thread + pthread_setschedparam(tID, SCHED_FIFO, &prio); #endif if (!initialized) { diff --git a/src/demod/DemodulatorThread.cpp b/src/demod/DemodulatorThread.cpp index c436477..5ce1f3d 100644 --- a/src/demod/DemodulatorThread.cpp +++ b/src/demod/DemodulatorThread.cpp @@ -22,10 +22,10 @@ void *DemodulatorThread::threadMain() { void DemodulatorThread::threadMain() { #endif #ifdef __APPLE__ - pthread_t tID = pthread_self(); // ID of this thread - int priority = sched_get_priority_min( SCHED_RR ); - sched_param prio = {priority}; // scheduling priority of thread - pthread_setschedparam( tID, SCHED_RR, &prio ); + pthread_t tID = pthread_self(); // ID of this thread + int priority = sched_get_priority_max( SCHED_FIFO )-1; + sched_param prio = { priority }; // scheduling priority of thread + pthread_setschedparam(tID, SCHED_FIFO, &prio); #endif msresamp_crcf audio_resampler = NULL; @@ -93,8 +93,11 @@ void DemodulatorThread::threadMain() { audioInputQueue->push(ati); } - if (visOutQueue != NULL) { - visOutQueue->push(ati); + if (visOutQueue != NULL && visOutQueue->empty()) { + AudioThreadInput ati_vis; + ati_vis.data.assign(demod_output,demod_output+num_written); + visOutQueue->push(ati_vis); +// visOutQueue->push(ati); } } diff --git a/src/sdr/SDRPostThread.cpp b/src/sdr/SDRPostThread.cpp index 6868fab..5487c68 100644 --- a/src/sdr/SDRPostThread.cpp +++ b/src/sdr/SDRPostThread.cpp @@ -42,12 +42,12 @@ void SDRPostThread::threadMain() { int n_read; double seconds = 0.0; -//#ifdef __APPLE__ -// pthread_t tID = pthread_self(); // ID of this thread -// int priority = sched_get_priority_min( SCHED_RR ); -// sched_param prio = { priority }; // scheduling priority of thread -// pthread_setschedparam( tID, SCHED_RR, &prio ); -//#endif +#ifdef __APPLE__ + pthread_t tID = pthread_self(); // ID of this thread + int priority = sched_get_priority_max( SCHED_FIFO) - 1; + sched_param prio = { priority }; // scheduling priority of thread + pthread_setschedparam(tID, SCHED_FIFO, &prio); +#endif dcFilter = iirfilt_crcf_create_dc_blocker(0.0005); @@ -96,15 +96,20 @@ void SDRPostThread::threadMain() { demodDataOut.bandwidth = data_in.bandwidth; demodDataOut.data = data_in.data; - for (int i = 0, iMax = demodulators.size(); i < iMax; i++) { - DemodulatorInstance *demod = demodulators[i]; + std::vector::iterator i; + for (i = demodulators.begin(); i != demodulators.end(); i++) { + DemodulatorInstance *demod = *i; DemodulatorThreadInputQueue *demodQueue = demod->threadQueueDemod; - if (demod->getParams().frequency != data_in.frequency) { - if (abs(data_in.frequency - demod->getParams().frequency) > (int) ((float) ((float) SRATE / 2.0) * 1.15)) { + if (demod->getParams().frequency != data_in.frequency + && abs(data_in.frequency - demod->getParams().frequency) > (int) ((float) ((float) SRATE / 2.0) * 1.15)) { + if (demod->isActive()) { + demod->setActive(false); demodQueue->push(dummyDataOut); continue; } + } else if (!demod->isActive()) { + demod->setActive(true); } demodQueue->push(demodDataOut); diff --git a/src/sdr/SDRThread.cpp b/src/sdr/SDRThread.cpp index a4b9d56..adfffb0 100644 --- a/src/sdr/SDRThread.cpp +++ b/src/sdr/SDRThread.cpp @@ -89,12 +89,12 @@ int SDRThread::enumerate_rtl() { } void SDRThread::threadMain() { -//#ifdef __APPLE__ -// pthread_t tID = pthread_self(); // ID of this thread -// int priority = sched_get_priority_min( SCHED_RR ); -// sched_param prio = { priority }; // scheduling priority of thread -// pthread_setschedparam( tID, SCHED_RR, &prio ); -//#endif +#ifdef __APPLE__ + pthread_t tID = pthread_self(); // ID of this thread + int priority = sched_get_priority_max( SCHED_FIFO )-1; + sched_param prio = { priority }; // scheduling priority of thread + pthread_setschedparam(tID, SCHED_FIFO, &prio); +#endif std::cout << "SDR thread initializing.." << std::endl; diff --git a/src/visual/WaterfallCanvas.cpp b/src/visual/WaterfallCanvas.cpp index ea1cf8d..ed6cd80 100644 --- a/src/visual/WaterfallCanvas.cpp +++ b/src/visual/WaterfallCanvas.cpp @@ -176,8 +176,8 @@ void WaterfallCanvas::OnKeyDown(wxKeyEvent& event) { if (!activeDemod) { break; } - wxGetApp().getDemodMgr().deleteThread(activeDemod); wxGetApp().removeDemodulator(activeDemod); + wxGetApp().getDemodMgr().deleteThread(activeDemod); break; default: