AudioThread: Rework mutex usage (again) + added proper cleanups (hopefully)

This commit is contained in:
vsonnier 2018-01-21 14:14:00 +01:00
parent d8ac9559fe
commit 36224defd7
2 changed files with 100 additions and 42 deletions

View File

@ -14,13 +14,13 @@
//50 ms //50 ms
#define HEARTBEAT_CHECK_PERIOD_MICROS (50 * 1000) #define HEARTBEAT_CHECK_PERIOD_MICROS (50 * 1000)
std::map<int, AudioThread *> AudioThread::deviceController; std::map<int, AudioThread* > AudioThread::deviceController;
std::map<int, int> AudioThread::deviceSampleRate; std::map<int, int> AudioThread::deviceSampleRate;
std::map<int, std::thread *> AudioThread::deviceThread;
std::recursive_mutex AudioThread::m_device_mutex; std::recursive_mutex AudioThread::m_device_mutex;
AudioThread::AudioThread() : IOThread(), nBufferFrames(1024), sampleRate(0) { AudioThread::AudioThread() : IOThread(), nBufferFrames(1024), sampleRate(0), controllerThread(nullptr) {
audioQueuePtr = 0; audioQueuePtr = 0;
underflowCount = 0; underflowCount = 0;
@ -31,6 +31,14 @@ AudioThread::AudioThread() : IOThread(), nBufferFrames(1024), sampleRate(0) {
AudioThread::~AudioThread() { AudioThread::~AudioThread() {
std::lock_guard<std::recursive_mutex> lock(m_mutex); std::lock_guard<std::recursive_mutex> lock(m_mutex);
if (controllerThread != nullptr) {
controllerThread->join();
delete controllerThread;
controllerThread = nullptr;
}
} }
std::recursive_mutex & AudioThread::getMutex() std::recursive_mutex & AudioThread::getMutex()
@ -38,6 +46,18 @@ std::recursive_mutex & AudioThread::getMutex()
return m_mutex; return m_mutex;
} }
void AudioThread::attachControllerThread(std::thread* controllerThread_in) {
//cleanup previous (should never happen)
if (controllerThread != nullptr) {
controllerThread->join();
delete controllerThread;
}
controllerThread = controllerThread_in;
}
void AudioThread::bindThread(AudioThread *other) { void AudioThread::bindThread(AudioThread *other) {
std::lock_guard<std::recursive_mutex> lock(m_mutex); std::lock_guard<std::recursive_mutex> lock(m_mutex);
@ -62,9 +82,23 @@ void AudioThread::deviceCleanup() {
std::lock_guard<std::recursive_mutex> lock(m_device_mutex); std::lock_guard<std::recursive_mutex> lock(m_device_mutex);
for (auto i = deviceController.begin(); i != deviceController.end(); i++) { auto it = deviceController.begin();
i->second->terminate();
std::cout << "Final audio management cleanup, terminating " << deviceController.size() << " device controllers..." << std::endl << std::flush;
while (it != deviceController.end()) {
//notify termination...
it->second->terminate();
//deletion of it->second will take care of the controllerThread:
delete it->second;
//next device
it++;
} }
std::cout << "Final audio management cleanup complete..." << std::endl << std::flush;
} }
static int audioCallback(void *outputBuffer, void * /* inputBuffer */, unsigned int nBufferFrames, double /* streamTime */, RtAudioStreamStatus status, static int audioCallback(void *outputBuffer, void * /* inputBuffer */, unsigned int nBufferFrames, double /* streamTime */, RtAudioStreamStatus status,
@ -76,8 +110,10 @@ static int audioCallback(void *outputBuffer, void * /* inputBuffer */, unsigned
//actually active. //actually active.
::memset(out, 0, nBufferFrames * 2 * sizeof(float)); ::memset(out, 0, nBufferFrames * 2 * sizeof(float));
//src in the controller thread:
AudioThread *src = (AudioThread *) userData; AudioThread *src = (AudioThread *) userData;
//by construction, src is a controller thread, from deviceController:
std::lock_guard<std::recursive_mutex> lock(src->getMutex()); std::lock_guard<std::recursive_mutex> lock(src->getMutex());
if (src->isTerminated()) { if (src->isTerminated()) {
@ -88,14 +124,9 @@ static int audioCallback(void *outputBuffer, void * /* inputBuffer */, unsigned
std::cout << "Audio buffer underflow.." << (src->underflowCount++) << std::endl << std::flush; std::cout << "Audio buffer underflow.." << (src->underflowCount++) << std::endl << std::flush;
} }
if (src->boundThreads.empty()) {
return 0;
}
double peak = 0.0; double peak = 0.0;
//for all boundThreads //Process the bound threads audio:
for (size_t j = 0; j < src->boundThreads.size(); j++) { for (size_t j = 0; j < src->boundThreads.size(); j++) {
AudioThread *srcmix = src->boundThreads[j]; AudioThread *srcmix = src->boundThreads[j];
@ -280,7 +311,7 @@ void AudioThread::enumerateDevices(std::vector<RtAudio::DeviceInfo> &devs) {
void AudioThread::setDeviceSampleRate(int deviceId, int sampleRate) { void AudioThread::setDeviceSampleRate(int deviceId, int sampleRate) {
AudioThread* matchingAudioThread = nullptr; AudioThread* matchingControllerThread = nullptr;
//scope lock here to minimize the common unique static lock contention //scope lock here to minimize the common unique static lock contention
{ {
@ -288,46 +319,50 @@ void AudioThread::setDeviceSampleRate(int deviceId, int sampleRate) {
if (deviceController.find(deviceId) != deviceController.end()) { if (deviceController.find(deviceId) != deviceController.end()) {
matchingAudioThread = deviceController[deviceId]; matchingControllerThread = deviceController[deviceId];
} }
} }
//out-of-lock test //out-of-lock test
if (matchingAudioThread != nullptr) { if (matchingControllerThread != nullptr) {
AudioThreadCommand refreshDevice; AudioThreadCommand refreshDevice;
refreshDevice.cmd = AudioThreadCommand::AUDIO_THREAD_CMD_SET_SAMPLE_RATE; refreshDevice.cmd = AudioThreadCommand::AUDIO_THREAD_CMD_SET_SAMPLE_RATE;
refreshDevice.int_value = sampleRate; refreshDevice.int_value = sampleRate;
//VSO : blocking push ! //VSO : blocking push !
matchingAudioThread->getCommandQueue()->push(refreshDevice); matchingControllerThread->getCommandQueue()->push(refreshDevice);
} }
} }
void AudioThread::setSampleRate(int sampleRate) { void AudioThread::setSampleRate(int sampleRate) {
bool outputIsThis = false; bool thisIsAController = false;
//scope lock here to minimize the common unique static lock contention //scope lock here to minimize the common unique static lock contention
{ {
std::lock_guard<std::recursive_mutex> lock(m_device_mutex); std::lock_guard<std::recursive_mutex> lock(m_device_mutex);
if (deviceController[outputDevice.load()] == this) { if (deviceController[outputDevice.load()] == this) {
outputIsThis = true; thisIsAController = true;
deviceSampleRate[outputDevice.load()] = sampleRate; deviceSampleRate[outputDevice.load()] = sampleRate;
} }
} }
std::lock_guard<std::recursive_mutex> lock(m_mutex); std::lock_guard<std::recursive_mutex> lock(m_mutex);
if (outputIsThis) { if (thisIsAController) {
dac.stopStream(); dac.stopStream();
dac.closeStream(); dac.closeStream();
//Set bounded sample rate:
for (size_t j = 0; j < boundThreads.size(); j++) { for (size_t j = 0; j < boundThreads.size(); j++) {
AudioThread *srcmix = boundThreads[j]; AudioThread *srcmix = boundThreads[j];
// the controller thread is part of the boundedThreads, so prevent infinite recursion:
if (srcmix != this) {
srcmix->setSampleRate(sampleRate); srcmix->setSampleRate(sampleRate);
} }
}
//make a local copy, snapshot of the list of demodulators //make a local copy, snapshot of the list of demodulators
std::vector<DemodulatorInstancePtr> demodulators = wxGetApp().getDemodMgr().getDemodulators(); std::vector<DemodulatorInstancePtr> demodulators = wxGetApp().getDemodMgr().getDemodulators();
@ -364,6 +399,11 @@ void AudioThread::setupDevice(int deviceId) {
try { try {
if (deviceController.find(outputDevice.load()) != deviceController.end()) { if (deviceController.find(outputDevice.load()) != deviceController.end()) {
//'this' is not the controller, so remove it from the bounded list:
//beware, we must take the controller mutex, because the audio callback may use the list of bounded
//threads at that moment:
std::lock_guard<std::recursive_mutex> lock(deviceController[outputDevice.load()]->getMutex());
deviceController[outputDevice.load()]->removeThread(this); deviceController[outputDevice.load()]->removeThread(this);
} }
#ifndef _MSC_VER #ifndef _MSC_VER
@ -381,18 +421,29 @@ void AudioThread::setupDevice(int deviceId) {
// deviceSampleRate[parameters.deviceId] = sampleRate; // deviceSampleRate[parameters.deviceId] = sampleRate;
} }
//Create a new controller:
if (deviceController.find(parameters.deviceId) == deviceController.end()) { if (deviceController.find(parameters.deviceId) == deviceController.end()) {
//Create a new controller thread for parameters.deviceId:
deviceController[parameters.deviceId] = new AudioThread(); deviceController[parameters.deviceId] = new AudioThread();
deviceController[parameters.deviceId]->setInitOutputDevice(parameters.deviceId, sampleRate); deviceController[parameters.deviceId]->setInitOutputDevice(parameters.deviceId, sampleRate);
// BEWARE: the controller add itself to the list of boundThreads !
deviceController[parameters.deviceId]->bindThread(this); deviceController[parameters.deviceId]->bindThread(this);
deviceThread[parameters.deviceId] = new std::thread(&AudioThread::threadMain, deviceController[parameters.deviceId]); deviceController[parameters.deviceId]->attachControllerThread(new std::thread(&AudioThread::threadMain, deviceController[parameters.deviceId]));
} else if (deviceController[parameters.deviceId] == this) { } else if (deviceController[parameters.deviceId] == this) {
//Attach callback //Attach callback
dac.openStream(&parameters, NULL, RTAUDIO_FLOAT32, sampleRate, &nBufferFrames, &audioCallback, (void *) this, &opts); dac.openStream(&parameters, NULL, RTAUDIO_FLOAT32, sampleRate, &nBufferFrames, &audioCallback, (void *)this, &opts);
dac.startStream(); dac.startStream();
} else { } else {
//we are a bound thread, add ourselves to the controller deviceController[parameters.deviceId].
//beware, we must take the controller mutex, because the audio callback may use the list of bounded
//threads at that moment:
std::lock_guard<std::recursive_mutex> lock(deviceController[parameters.deviceId]->getMutex());
deviceController[parameters.deviceId]->bindThread(this); deviceController[parameters.deviceId]->bindThread(this);
} }
active = true; active = true;
@ -467,17 +518,13 @@ void AudioThread::run() {
if (command.cmd == AudioThreadCommand::AUDIO_THREAD_CMD_SET_SAMPLE_RATE) { if (command.cmd == AudioThreadCommand::AUDIO_THREAD_CMD_SET_SAMPLE_RATE) {
setSampleRate(command.int_value); setSampleRate(command.int_value);
} }
} } //end while
// Drain any remaining inputs, with a non-blocking pop // Drain any remaining inputs, with a non-blocking pop
if (inputQueue != nullptr) { if (inputQueue != nullptr) {
inputQueue->flush(); inputQueue->flush();
} }
//Thread termination, prevent fancy things to happen, lock the whole thing:
//This way audioThreadCallback is rightly protected from thread termination
std::lock_guard<std::recursive_mutex> lock(m_mutex);
//Nullify currentInput... //Nullify currentInput...
currentInput = nullptr; currentInput = nullptr;
@ -485,8 +532,14 @@ void AudioThread::run() {
std::lock_guard<std::recursive_mutex> global_lock(m_device_mutex); std::lock_guard<std::recursive_mutex> global_lock(m_device_mutex);
if (deviceController[parameters.deviceId] != this) { if (deviceController[parameters.deviceId] != this) {
//'this' is not the controller, so remove it from the bounded list:
//beware, we must take the controller mutex, because the audio callback may use the list of bounded
//threads at that moment:
std::lock_guard<std::recursive_mutex> lock(deviceController[parameters.deviceId]->getMutex());
deviceController[parameters.deviceId]->removeThread(this); deviceController[parameters.deviceId]->removeThread(this);
} else { } else {
// 'this' is a controller thread:
try { try {
if (dac.isStreamOpen()) { if (dac.isStreamOpen()) {
if (dac.isStreamRunning()) { if (dac.isStreamRunning()) {
@ -514,7 +567,9 @@ bool AudioThread::isActive() {
void AudioThread::setActive(bool state) { void AudioThread::setActive(bool state) {
AudioThread* matchingAudioThread = nullptr; AudioThread* matchingControllerThread = nullptr;
std::lock_guard<std::recursive_mutex> lock(m_mutex);
//scope lock here to minimize the common unique static lock contention //scope lock here to minimize the common unique static lock contention
{ {
@ -522,20 +577,18 @@ void AudioThread::setActive(bool state) {
if (deviceController.find(parameters.deviceId) != deviceController.end()) { if (deviceController.find(parameters.deviceId) != deviceController.end()) {
matchingAudioThread = deviceController[parameters.deviceId]; matchingControllerThread = deviceController[parameters.deviceId];
} }
} }
std::lock_guard<std::recursive_mutex> lock(m_mutex); if (matchingControllerThread == nullptr) {
if (matchingAudioThread == nullptr) {
return; return;
} }
if (state && !active && inputQueue) { if (state && !active && inputQueue) {
matchingAudioThread->bindThread(this); matchingControllerThread->bindThread(this);
} else if (!state && active) { } else if (!state && active) {
matchingAudioThread->removeThread(this); matchingControllerThread->removeThread(this);
} }
// Activity state changing, clear any inputs // Activity state changing, clear any inputs

View File

@ -8,6 +8,7 @@
#include <map> #include <map>
#include <string> #include <string>
#include <atomic> #include <atomic>
#include <algorithm>
#include <memory> #include <memory>
#include "ThreadBlockingQueue.h" #include "ThreadBlockingQueue.h"
#include "RtAudio.h" #include "RtAudio.h"
@ -110,6 +111,9 @@ public:
static void deviceCleanup(); static void deviceCleanup();
static void setDeviceSampleRate(int deviceId, int sampleRate); static void setDeviceSampleRate(int deviceId, int sampleRate);
//
void attachControllerThread(std::thread* controllerThread);
//fields below, only to be used by other AudioThreads ! //fields below, only to be used by other AudioThreads !
size_t underflowCount; size_t underflowCount;
//protected by m_mutex //protected by m_mutex
@ -131,6 +135,9 @@ private:
AudioThreadCommandQueue cmdQueue; AudioThreadCommandQueue cmdQueue;
int sampleRate; int sampleRate;
//if != nullptr, it mean AudioThread is a controller thread.
std::thread* controllerThread = nullptr;
//The own m_mutex protecting this AudioThread, in particular boundThreads //The own m_mutex protecting this AudioThread, in particular boundThreads
std::recursive_mutex m_mutex; std::recursive_mutex m_mutex;
@ -140,10 +147,8 @@ private:
void bindThread(AudioThread *other); void bindThread(AudioThread *other);
void removeThread(AudioThread *other); void removeThread(AudioThread *other);
static std::map<int, AudioThread *> deviceController; static std::map<int, AudioThread* > deviceController;
static std::map<int, std::thread *> deviceThread;
//The mutex protecting static deviceController, deviceThread and deviceSampleRate access. //The mutex protecting static deviceController, deviceThread and deviceSampleRate access.
static std::recursive_mutex m_device_mutex; static std::recursive_mutex m_device_mutex;
}; };