diff --git a/src/audio/AudioThread.cpp b/src/audio/AudioThread.cpp index e5db98b..a8df680 100644 --- a/src/audio/AudioThread.cpp +++ b/src/audio/AudioThread.cpp @@ -14,13 +14,13 @@ //50 ms #define HEARTBEAT_CHECK_PERIOD_MICROS (50 * 1000) -std::map AudioThread::deviceController; +std::map AudioThread::deviceController; + std::map AudioThread::deviceSampleRate; -std::map AudioThread::deviceThread; std::recursive_mutex AudioThread::m_device_mutex; -AudioThread::AudioThread() : IOThread(), nBufferFrames(1024), sampleRate(0) { +AudioThread::AudioThread() : IOThread(), nBufferFrames(1024), sampleRate(0), controllerThread(nullptr) { audioQueuePtr = 0; underflowCount = 0; @@ -31,6 +31,14 @@ AudioThread::AudioThread() : IOThread(), nBufferFrames(1024), sampleRate(0) { AudioThread::~AudioThread() { std::lock_guard lock(m_mutex); + + if (controllerThread != nullptr) { + + controllerThread->join(); + delete controllerThread; + + controllerThread = nullptr; + } } std::recursive_mutex & AudioThread::getMutex() @@ -38,6 +46,18 @@ std::recursive_mutex & AudioThread::getMutex() return m_mutex; } +void AudioThread::attachControllerThread(std::thread* controllerThread_in) { + + //cleanup previous (should never happen) + if (controllerThread != nullptr) { + + controllerThread->join(); + delete controllerThread; + } + + controllerThread = controllerThread_in; +} + void AudioThread::bindThread(AudioThread *other) { std::lock_guard lock(m_mutex); @@ -62,9 +82,23 @@ void AudioThread::deviceCleanup() { std::lock_guard lock(m_device_mutex); - for (auto i = deviceController.begin(); i != deviceController.end(); i++) { - i->second->terminate(); + auto it = deviceController.begin(); + + std::cout << "Final audio management cleanup, terminating " << deviceController.size() << " device controllers..." << std::endl << std::flush; + + while (it != deviceController.end()) { + + //notify termination... + it->second->terminate(); + + //deletion of it->second will take care of the controllerThread: + delete it->second; + + //next device + it++; } + + std::cout << "Final audio management cleanup complete..." << std::endl << std::flush; } static int audioCallback(void *outputBuffer, void * /* inputBuffer */, unsigned int nBufferFrames, double /* streamTime */, RtAudioStreamStatus status, @@ -76,8 +110,10 @@ static int audioCallback(void *outputBuffer, void * /* inputBuffer */, unsigned //actually active. ::memset(out, 0, nBufferFrames * 2 * sizeof(float)); + //src in the controller thread: AudioThread *src = (AudioThread *) userData; + //by construction, src is a controller thread, from deviceController: std::lock_guard lock(src->getMutex()); if (src->isTerminated()) { @@ -88,17 +124,12 @@ static int audioCallback(void *outputBuffer, void * /* inputBuffer */, unsigned std::cout << "Audio buffer underflow.." << (src->underflowCount++) << std::endl << std::flush; } - if (src->boundThreads.empty()) { - return 0; - } - - double peak = 0.0; - //for all boundThreads + //Process the bound threads audio: for (size_t j = 0; j < src->boundThreads.size(); j++) { - AudioThread *srcmix = src->boundThreads[j]; + AudioThread *srcmix = src->boundThreads[j]; //lock every single boundThread srcmix in succession the time we process //its audio samples. @@ -280,7 +311,7 @@ void AudioThread::enumerateDevices(std::vector &devs) { void AudioThread::setDeviceSampleRate(int deviceId, int sampleRate) { - AudioThread* matchingAudioThread = nullptr; + AudioThread* matchingControllerThread = nullptr; //scope lock here to minimize the common unique static lock contention { @@ -288,45 +319,49 @@ void AudioThread::setDeviceSampleRate(int deviceId, int sampleRate) { if (deviceController.find(deviceId) != deviceController.end()) { - matchingAudioThread = deviceController[deviceId]; + matchingControllerThread = deviceController[deviceId]; } } //out-of-lock test - if (matchingAudioThread != nullptr) { + if (matchingControllerThread != nullptr) { AudioThreadCommand refreshDevice; refreshDevice.cmd = AudioThreadCommand::AUDIO_THREAD_CMD_SET_SAMPLE_RATE; refreshDevice.int_value = sampleRate; //VSO : blocking push ! - matchingAudioThread->getCommandQueue()->push(refreshDevice); + matchingControllerThread->getCommandQueue()->push(refreshDevice); } } void AudioThread::setSampleRate(int sampleRate) { - bool outputIsThis = false; + bool thisIsAController = false; //scope lock here to minimize the common unique static lock contention { std::lock_guard lock(m_device_mutex); if (deviceController[outputDevice.load()] == this) { - outputIsThis = true; + thisIsAController = true; deviceSampleRate[outputDevice.load()] = sampleRate; } } std::lock_guard lock(m_mutex); - if (outputIsThis) { + if (thisIsAController) { dac.stopStream(); dac.closeStream(); + //Set bounded sample rate: for (size_t j = 0; j < boundThreads.size(); j++) { AudioThread *srcmix = boundThreads[j]; - srcmix->setSampleRate(sampleRate); + // the controller thread is part of the boundedThreads, so prevent infinite recursion: + if (srcmix != this) { + srcmix->setSampleRate(sampleRate); + } } //make a local copy, snapshot of the list of demodulators @@ -364,6 +399,11 @@ void AudioThread::setupDevice(int deviceId) { try { if (deviceController.find(outputDevice.load()) != deviceController.end()) { + //'this' is not the controller, so remove it from the bounded list: + //beware, we must take the controller mutex, because the audio callback may use the list of bounded + //threads at that moment: + std::lock_guard lock(deviceController[outputDevice.load()]->getMutex()); + deviceController[outputDevice.load()]->removeThread(this); } #ifndef _MSC_VER @@ -381,18 +421,29 @@ void AudioThread::setupDevice(int deviceId) { // deviceSampleRate[parameters.deviceId] = sampleRate; } + //Create a new controller: if (deviceController.find(parameters.deviceId) == deviceController.end()) { + + //Create a new controller thread for parameters.deviceId: deviceController[parameters.deviceId] = new AudioThread(); deviceController[parameters.deviceId]->setInitOutputDevice(parameters.deviceId, sampleRate); + // BEWARE: the controller add itself to the list of boundThreads ! deviceController[parameters.deviceId]->bindThread(this); - deviceThread[parameters.deviceId] = new std::thread(&AudioThread::threadMain, deviceController[parameters.deviceId]); - } else if (deviceController[parameters.deviceId] == this) { - //Attach callback - dac.openStream(¶meters, NULL, RTAUDIO_FLOAT32, sampleRate, &nBufferFrames, &audioCallback, (void *) this, &opts); - dac.startStream(); - } else { + deviceController[parameters.deviceId]->attachControllerThread(new std::thread(&AudioThread::threadMain, deviceController[parameters.deviceId])); + + } else if (deviceController[parameters.deviceId] == this) { + + //Attach callback + dac.openStream(¶meters, NULL, RTAUDIO_FLOAT32, sampleRate, &nBufferFrames, &audioCallback, (void *)this, &opts); + dac.startStream(); + } else { + //we are a bound thread, add ourselves to the controller deviceController[parameters.deviceId]. + //beware, we must take the controller mutex, because the audio callback may use the list of bounded + //threads at that moment: + std::lock_guard lock(deviceController[parameters.deviceId]->getMutex()); + deviceController[parameters.deviceId]->bindThread(this); } active = true; @@ -467,17 +518,13 @@ void AudioThread::run() { if (command.cmd == AudioThreadCommand::AUDIO_THREAD_CMD_SET_SAMPLE_RATE) { setSampleRate(command.int_value); } - } + } //end while // Drain any remaining inputs, with a non-blocking pop if (inputQueue != nullptr) { inputQueue->flush(); } - //Thread termination, prevent fancy things to happen, lock the whole thing: - //This way audioThreadCallback is rightly protected from thread termination - std::lock_guard lock(m_mutex); - //Nullify currentInput... currentInput = nullptr; @@ -485,8 +532,14 @@ void AudioThread::run() { std::lock_guard global_lock(m_device_mutex); if (deviceController[parameters.deviceId] != this) { + //'this' is not the controller, so remove it from the bounded list: + //beware, we must take the controller mutex, because the audio callback may use the list of bounded + //threads at that moment: + std::lock_guard lock(deviceController[parameters.deviceId]->getMutex()); + deviceController[parameters.deviceId]->removeThread(this); } else { + // 'this' is a controller thread: try { if (dac.isStreamOpen()) { if (dac.isStreamRunning()) { @@ -514,7 +567,9 @@ bool AudioThread::isActive() { void AudioThread::setActive(bool state) { - AudioThread* matchingAudioThread = nullptr; + AudioThread* matchingControllerThread = nullptr; + + std::lock_guard lock(m_mutex); //scope lock here to minimize the common unique static lock contention { @@ -522,20 +577,18 @@ void AudioThread::setActive(bool state) { if (deviceController.find(parameters.deviceId) != deviceController.end()) { - matchingAudioThread = deviceController[parameters.deviceId]; + matchingControllerThread = deviceController[parameters.deviceId]; } } - std::lock_guard lock(m_mutex); - - if (matchingAudioThread == nullptr) { + if (matchingControllerThread == nullptr) { return; } if (state && !active && inputQueue) { - matchingAudioThread->bindThread(this); + matchingControllerThread->bindThread(this); } else if (!state && active) { - matchingAudioThread->removeThread(this); + matchingControllerThread->removeThread(this); } // Activity state changing, clear any inputs diff --git a/src/audio/AudioThread.h b/src/audio/AudioThread.h index 7585faa..0149193 100644 --- a/src/audio/AudioThread.h +++ b/src/audio/AudioThread.h @@ -8,6 +8,7 @@ #include #include #include +#include #include #include "ThreadBlockingQueue.h" #include "RtAudio.h" @@ -110,6 +111,9 @@ public: static void deviceCleanup(); static void setDeviceSampleRate(int deviceId, int sampleRate); + // + void attachControllerThread(std::thread* controllerThread); + //fields below, only to be used by other AudioThreads ! size_t underflowCount; //protected by m_mutex @@ -131,6 +135,9 @@ private: AudioThreadCommandQueue cmdQueue; int sampleRate; + //if != nullptr, it mean AudioThread is a controller thread. + std::thread* controllerThread = nullptr; + //The own m_mutex protecting this AudioThread, in particular boundThreads std::recursive_mutex m_mutex; @@ -140,10 +147,8 @@ private: void bindThread(AudioThread *other); void removeThread(AudioThread *other); - static std::map deviceController; - static std::map deviceThread; - + static std::map deviceController; + //The mutex protecting static deviceController, deviceThread and deviceSampleRate access. static std::recursive_mutex m_device_mutex; }; -