From 7007e8f6aac731bc6846c01d8046d1721816db07 Mon Sep 17 00:00:00 2001 From: WolverinDEV Date: Thu, 6 Feb 2020 23:59:09 +0100 Subject: [PATCH] Fixed an crash within the audio API and better usage of input devices --- .../src/audio/AudioDevice.cpp | 8 +- .../serverconnection/src/audio/AudioInput.cpp | 264 ++++++++++++------ .../serverconnection/src/audio/AudioInput.h | 23 +- .../src/audio/AudioMerger.cpp | 6 +- .../src/audio/js/AudioRecorder.cpp | 23 +- .../src/connection/ft/FileTransferObject.cpp | 4 +- native/serverconnection/test/js/audio.ts | 114 ++++++++ 7 files changed, 322 insertions(+), 120 deletions(-) create mode 100644 native/serverconnection/test/js/audio.ts diff --git a/native/serverconnection/src/audio/AudioDevice.cpp b/native/serverconnection/src/audio/AudioDevice.cpp index ec9e7a4..a4b7fc8 100644 --- a/native/serverconnection/src/audio/AudioDevice.cpp +++ b/native/serverconnection/src/audio/AudioDevice.cpp @@ -9,21 +9,21 @@ extern bool devices_cached(); /* if the result is false then the call to devices extern std::deque> devices(); bool _devices_cached = false; -mutex _audio_devices_lock; -deque> _audio_devices{}; +std::mutex _audio_devices_lock; +std::deque> _audio_devices{}; bool audio::devices_cached() { return _devices_cached; } void audio::clear_device_cache() { - lock_guard lock(_audio_devices_lock); + std::lock_guard lock(_audio_devices_lock); _audio_devices.clear(); _devices_cached = false; } deque> audio::devices() { - lock_guard lock(_audio_devices_lock); + std::lock_guard lock(_audio_devices_lock); if(_devices_cached) return _audio_devices; diff --git a/native/serverconnection/src/audio/AudioInput.cpp b/native/serverconnection/src/audio/AudioInput.cpp index 0af838f..9e017e1 100644 --- a/native/serverconnection/src/audio/AudioInput.cpp +++ b/native/serverconnection/src/audio/AudioInput.cpp @@ -1,5 +1,6 @@ #include #include +#include #include "AudioInput.h" #include "AudioReframer.h" #include "../logger.h" @@ -8,6 +9,150 @@ using namespace std; using namespace tc; using namespace tc::audio; +class AudioInputSource { + public: + constexpr static auto kChannelCount{2}; + constexpr static auto kSampleRate{48000}; + + explicit AudioInputSource(PaHostApiIndex index) : device_index{index} {} + ~AudioInputSource() = default; + + /* its blocking! */ + void register_consumer(AudioInput* consumer) { + std::lock_guard lock{this->registered_inputs_lock}; + if(find(this->registered_inputs.begin(), this->registered_inputs.end(), consumer) != this->registered_inputs.end()) + return; + + this->registered_inputs.push_back(consumer); + } + + /* its blocking */ + void remove_consumer(AudioInput* consumer) { + std::lock_guard lock{this->registered_inputs_lock}; + + auto index = find(this->registered_inputs.begin(), this->registered_inputs.end(), consumer); + if(index == this->registered_inputs.end()) + return; + + this->registered_inputs.erase(index); + if(!this->registered_inputs.empty()) + return; + } + + /* this could take a bit longer! */ + bool begin_recording(std::string& error) { + std::lock_guard lock{this->state_lock}; + if(this->state == RECORDING) return true; + + if(this->state != STOPPED) { + if(this->state == DELETED) { + error = "stream has been deleted"; + return false; + } + error = "invalid state"; + return false; + } + + this->current_device = Pa_GetDeviceInfo(this->device_index); + if(!this->current_device) { + error = "failed to get device info"; + return false; + } + + PaStreamParameters parameters{}; + memset(¶meters, 0, sizeof(parameters)); + parameters.channelCount = (int) kChannelCount; + parameters.device = this->device_index; + parameters.sampleFormat = paFloat32; + parameters.suggestedLatency = this->current_device->defaultLowOutputLatency; + auto err = Pa_OpenStream( + &this->input_stream, + ¶meters, + nullptr, + (double) kSampleRate, + paFramesPerBufferUnspecified, + paClipOff, + &AudioInputSource::pa_audio_callback, + this); + + if(err != paNoError) { + this->input_stream = nullptr; + error = to_string(err) + "/" + Pa_GetErrorText(err); + return false; + } + + err = Pa_StartStream(this->input_stream); + if(err != paNoError) { + error = "recording failed " + to_string(err) + "/" + Pa_GetErrorText(err); + err = Pa_CloseStream(this->input_stream); + if(err != paNoError) + log_critical(category::audio, tr("Failed to close opened pa stream. This will cause memory leaks. Error: {}/{}"), err, Pa_GetErrorText(err)); + return false; + } + this->state = RECORDING; + + return true; + } + + void stop_recording_if_possible() { + std::lock_guard lock{this->state_lock}; + if(this->state != RECORDING) return; + + { + std::lock_guard client_lock{this->registered_inputs_lock}; + if(!this->registered_inputs.empty()) return; + } + this->state = STOPPED; + + if(Pa_IsStreamActive(this->input_stream)) + Pa_AbortStream(this->input_stream); + + auto error = Pa_CloseStream(this->input_stream); + if(error != paNoError) + log_error(category::audio, tr("Failed to close PA stream: {}"), error); + this->input_stream = nullptr; + } + + const PaDeviceIndex device_index; + private: + static int pa_audio_callback(const void *input, void *output, unsigned long frameCount, const PaStreamCallbackTimeInfo* timeInfo, PaStreamCallbackFlags statusFlags, void* _input_source) { + if(!input) return 0; /* this should never happen */ + auto input_source = (AudioInputSource*) _input_source; + + std::lock_guard lock{input_source->registered_inputs_lock}; + for(auto& client : input_source->registered_inputs) + client->audio_callback(input, frameCount, timeInfo, statusFlags); + return 0; + } + + std::mutex state_lock{}; + enum _state { + STOPPED, + RECORDING, + DELETED + } state{STOPPED}; + PaStream* input_stream{nullptr}; + const PaDeviceInfo* current_device = nullptr; + + std::mutex registered_inputs_lock{}; + std::vector registered_inputs{}; +}; +std::mutex input_sources_lock{}; +static std::deque> input_sources{}; + +std::shared_ptr get_input_source(PaDeviceIndex device_index, bool create = true) { + std::lock_guard sources_lock{input_sources_lock}; + for(const auto& input : input_sources) + if(input->device_index == device_index) + return input; + if(!create) + return nullptr; + + auto input = std::make_shared(device_index); + input_sources.push_back(std::make_shared(device_index)); + return input; +} + AudioConsumer::AudioConsumer(tc::audio::AudioInput *handle, size_t channel_count, size_t sample_rate, size_t frame_size) : handle(handle), channel_count(channel_count), @@ -44,91 +189,50 @@ AudioInput::~AudioInput() { consumer->handle = nullptr; } -bool AudioInput::open_device(std::string& error, PaDeviceIndex index) { - lock_guard lock(this->input_stream_lock); +PaDeviceIndex AudioInput::current_device() { + lock_guard lock(this->input_source_lock); + return this->input_source ? this->input_source->device_index : paNoDevice; +} - if(index == this->_current_device_index) +bool AudioInput::open_device(std::string& error, PaDeviceIndex index) { + lock_guard lock(this->input_source_lock); + + if(index == (this->input_source ? this->input_source->device_index : paNoDevice)) return true; this->close_device(); - this->_current_device_index = index; if(index == paNoDevice) return true; - this->_current_device = Pa_GetDeviceInfo(index); - if(!this->_current_device) { - this->_current_device_index = paNoDevice; - error = "failed to get device info"; - return false; - } - - PaStreamParameters parameters{}; - memset(¶meters, 0, sizeof(parameters)); - parameters.channelCount = (int) this->_channel_count; - parameters.device = this->_current_device_index; - parameters.sampleFormat = paFloat32; - parameters.suggestedLatency = this->_current_device->defaultLowOutputLatency; - auto err = Pa_OpenStream( - &this->input_stream, - ¶meters, - nullptr, - (double) this->_sample_rate, - paFramesPerBufferUnspecified, - paClipOff, - &AudioInput::_audio_callback, - this); - - if(err != paNoError) { - this->input_stream = nullptr; - error = to_string(err) + "/" + Pa_GetErrorText(err); - return false; - } - - return true; -} - -bool AudioInput::record() { - lock_guard lock(this->input_stream_lock); - if(!this->input_stream) - return false; - - if(Pa_IsStreamActive(this->input_stream)) - return true; - - auto err = Pa_StartStream(this->input_stream); - if(err != paNoError && err != paStreamIsNotStopped) { - log_error(category::audio, tr("Pa_StartStream returned {}"), err); - return false; - } - - return true; -} - -bool AudioInput::recording() { - lock_guard lock(this->input_stream_lock); - return this->input_stream && Pa_IsStreamActive(this->input_stream); -} - -void AudioInput::stop() { - lock_guard lock(this->input_stream_lock); - if(this->input_stream) { - if(Pa_IsStreamActive(this->input_stream)) - Pa_AbortStream(this->input_stream); - } + this->input_source = get_input_source(index, true); + this->input_source->register_consumer(this); + return this->input_source->begin_recording(error); } void AudioInput::close_device() { - lock_guard lock(this->input_stream_lock); - if(this->input_stream) { - if(Pa_IsStreamActive(this->input_stream)) - Pa_AbortStream(this->input_stream); + lock_guard lock(this->input_source_lock); + if(this->input_source) { + this->input_source->remove_consumer(this); + this->input_source->stop_recording_if_possible(); + this->input_source.reset(); + } + this->input_recording = false; +} - auto error = Pa_CloseStream(this->input_stream); - if(error != paNoError) - log_error(category::audio, tr("Failed to close PA stream: {}"), error); - this->input_stream = nullptr; - this_thread::sleep_for(chrono::seconds{1}); - } +bool AudioInput::record() { + lock_guard lock(this->input_source_lock); + if(!this->input_source) return false; + + this->input_recording = true; + return true; +} + +bool AudioInput::recording() { + return this->input_recording; +} + +void AudioInput::stop() { + this->input_recording = false; } std::shared_ptr AudioInput::create_consumer(size_t frame_length) { @@ -151,15 +255,10 @@ void AudioInput::delete_consumer(const std::shared_ptr &source) { source->handle = nullptr; } -int AudioInput::_audio_callback(const void *a, void *b, unsigned long c, const PaStreamCallbackTimeInfo* d, PaStreamCallbackFlags e, void *_ptr_audio_output) { - return reinterpret_cast(_ptr_audio_output)->audio_callback(a, b, c, d, e); -} +void AudioInput::audio_callback(const void *input, unsigned long frameCount, const PaStreamCallbackTimeInfo* timeInfo, PaStreamCallbackFlags statusFlags) { + if(!this->input_recording) return; -int AudioInput::audio_callback(const void *input, void *output, unsigned long frameCount, const PaStreamCallbackTimeInfo* timeInfo, PaStreamCallbackFlags statusFlags) { - if (!input) /* hmmm.. suspicious */ - return 0; - - if(this->_volume != 1) { + if(this->_volume != 1 && false) { auto ptr = (float*) input; auto left = frameCount * this->_channel_count; while(left-- > 0) @@ -175,5 +274,4 @@ int AudioInput::audio_callback(const void *input, void *output, unsigned long fr if(ms > 5) { log_warn(category::audio, tr("Processing of audio input needed {}ms. This could be an issue!"), chrono::duration_cast(end - begin).count()); } - return 0; } \ No newline at end of file diff --git a/native/serverconnection/src/audio/AudioInput.h b/native/serverconnection/src/audio/AudioInput.h index d441272..9bb4e17 100644 --- a/native/serverconnection/src/audio/AudioInput.h +++ b/native/serverconnection/src/audio/AudioInput.h @@ -9,6 +9,7 @@ #include #include "AudioSamples.h" +class AudioInputSource; namespace tc { namespace audio { class AudioInput; @@ -39,16 +40,18 @@ namespace tc { }; class AudioInput { + friend class ::AudioInputSource; public: AudioInput(size_t /* channels */, size_t /* rate */); virtual ~AudioInput(); - bool open_device(std::string& /* error */, PaDeviceIndex); - bool record(); - bool recording(); + [[nodiscard]] bool open_device(std::string& /* error */, PaDeviceIndex); + [[nodiscard]] PaDeviceIndex current_device(); + void close_device(); + + [[nodiscard]] bool record(); + [[nodiscard]] bool recording(); void stop(); - void close_device(); - PaDeviceIndex current_device() { return this->_current_device_index; } std::deque> consumers() { std::lock_guard lock(this->consumers_lock); @@ -64,8 +67,7 @@ namespace tc { inline float volume() { return this->_volume; } inline void set_volume(float value) { this->_volume = value; } private: - static int _audio_callback(const void *, void *, unsigned long, const PaStreamCallbackTimeInfo*, PaStreamCallbackFlags, void*); - int audio_callback(const void *, void *, unsigned long, const PaStreamCallbackTimeInfo*, PaStreamCallbackFlags); + void audio_callback(const void *, unsigned long, const PaStreamCallbackTimeInfo*, PaStreamCallbackFlags); size_t const _channel_count; size_t const _sample_rate; @@ -73,10 +75,9 @@ namespace tc { std::mutex consumers_lock; std::deque> _consumers; - std::recursive_mutex input_stream_lock; - const PaDeviceInfo* _current_device = nullptr; - PaDeviceIndex _current_device_index = paNoDevice; - PaStream* input_stream = nullptr; + std::recursive_mutex input_source_lock; + bool input_recording{false}; + std::shared_ptr<::AudioInputSource> input_source{}; float _volume = 1.f; }; diff --git a/native/serverconnection/src/audio/AudioMerger.cpp b/native/serverconnection/src/audio/AudioMerger.cpp index b1f2e63..f075ff3 100644 --- a/native/serverconnection/src/audio/AudioMerger.cpp +++ b/native/serverconnection/src/audio/AudioMerger.cpp @@ -7,7 +7,7 @@ using namespace std; using namespace tc::audio; /* technique based on http://www.vttoth.com/CMS/index.php/technical-notes/68 */ -inline float merge_ab(float a, float b) { +inline constexpr float merge_ab(float a, float b) { /* * Form: A,B := [0;n] * Z = 2(A + B) - (2/n) * A * B - n @@ -21,6 +21,10 @@ inline float merge_ab(float a, float b) { return (2 * (a + b) - a * b - 2) - 1; } +static_assert(merge_ab(1, 0) == 1); +static_assert(merge_ab(0, 1) == 1); +static_assert(merge_ab(1, 1) == 1); + bool merge::merge_sources(void *_dest, void *_src_a, void *_src_b, size_t channels, size_t samples) { auto dest = (float*) _dest; auto src_a = (float*) _src_a; diff --git a/native/serverconnection/src/audio/js/AudioRecorder.cpp b/native/serverconnection/src/audio/js/AudioRecorder.cpp index 4816d6e..1153070 100644 --- a/native/serverconnection/src/audio/js/AudioRecorder.cpp +++ b/native/serverconnection/src/audio/js/AudioRecorder.cpp @@ -172,26 +172,11 @@ NAN_METHOD(AudioRecorderWrapper::_start) { return; } - unique_ptr> _callback = make_unique>(info[0].As()); - unique_ptr> _recorder = make_unique>(info.Holder()); + auto input = ObjectWrap::Unwrap(info.Holder())->_input; - auto _async_callback = Nan::async_callback([call = std::move(_callback), recorder = move(_recorder)](bool result) mutable { - Nan::HandleScope scope; - auto callback_function = call->Get(Nan::GetCurrentContext()->GetIsolate()); - - v8::Local argv[1]; - argv[0] = v8::Boolean::New(Nan::GetCurrentContext()->GetIsolate(), result); - callback_function->Call(Nan::GetCurrentContext(), Nan::Undefined(), 1, argv); - - recorder->Reset(); - call->Reset(); - }).option_destroyed_execute(true); - - auto handle = ObjectWrap::Unwrap(info.Holder()); - auto input = handle->_input; - std::thread([_async_callback, input]{ - _async_callback(input->record()); - }).detach(); + v8::Local argv[1]; + argv[0] = v8::Boolean::New(Nan::GetCurrentContext()->GetIsolate(), input->record()); + (void) info[0].As()->Call(Nan::GetCurrentContext(), Nan::Undefined(), 1, argv); } NAN_METHOD(AudioRecorderWrapper::_started) { diff --git a/native/serverconnection/src/connection/ft/FileTransferObject.cpp b/native/serverconnection/src/connection/ft/FileTransferObject.cpp index 0f954cd..153bf40 100644 --- a/native/serverconnection/src/connection/ft/FileTransferObject.cpp +++ b/native/serverconnection/src/connection/ft/FileTransferObject.cpp @@ -159,11 +159,11 @@ void TransferObjectWrap::do_wrap(v8::Local object) { auto direction = source ? "upload" : "download"; Nan::Set(object, Nan::New("direction").ToLocalChecked(), - v8::String::NewFromUtf8(Nan::GetCurrentContext()->GetIsolate(), direction).ToLocalChecked() + Nan::New(direction).ToLocalChecked() ); Nan::Set(object, Nan::New("name").ToLocalChecked(), - v8::String::NewFromUtf8(Nan::GetCurrentContext()->GetIsolate(), this->target()->name().c_str()).ToLocalChecked() + Nan::New(this->target()->name().c_str()).ToLocalChecked() ); if(source) { diff --git a/native/serverconnection/test/js/audio.ts b/native/serverconnection/test/js/audio.ts new file mode 100644 index 0000000..016d171 --- /dev/null +++ b/native/serverconnection/test/js/audio.ts @@ -0,0 +1,114 @@ +/// +console.log("HELLO WORLD"); +module.paths.push("../../build/linux_x64"); +module.paths.push("../../build/win32_64"); + +//LD_PRELOAD=/usr/lib/x86_64-linux-gnu/libasan.so.5 +const os = require('os'); +//process.dlopen(module, '/usr/lib/x86_64-linux-gnu/libasan.so.5', +// os.constants.dlopen.RTLD_NOW); +import * as fs from "fs"; + +const original_require = require; +require = (module => original_require(__dirname + "/../../../build/linux_x64/" + module + ".node")) as any; +import * as handle from "teaclient_connection"; +require = original_require; + +const connection_list = []; +const connection = handle.spawn_server_connection(); +const client_list = []; + +console.dir(handle); +console.log("Query devices..."); +console.log("Devices: %o", handle.audio.available_devices()); +console.log("Current playback device: %o", handle.audio.playback.current_device()); +//handle.audio.playback.set_device(14); +//console.log("Current playback device: %o", handle.audio.playback.current_device()); + +const stream = handle.audio.playback.create_stream(); +console.log("Own stream: %o", stream); + + +for(let i = 0; i < 12; i++) { + const recorder = handle.audio.record.create_recorder(); + for(const device of handle.audio.available_devices()) { + if(!device.input_supported) + continue; + + if(device.name != "pulse") + continue; + + console.log("Found pulse at %o", device.device_index); + recorder.set_device(device.device_index, () => { + recorder.start(flag => console.log("X: " + flag)); + const consumer = recorder.create_consumer(); + consumer.create_filter_threshold(2); + }); + break; + } +} + +/* -1 => default device */ +const recorder = handle.audio.record.create_recorder(); +console.log("Have device: %o", recorder); +console.log("Device: %o", recorder.get_device()); +if(recorder.get_device() == -1) { + console.log("Looking for devices"); + for(const device of handle.audio.available_devices()) { + if(!device.input_supported) + continue; + + if(device.name != "pulse") + continue; + + console.log("Found pulse at %o", device.device_index); + recorder.set_device(device.device_index, () => {}); + } +} +console.log("Device: %o", recorder.get_device()); +recorder.start(() => {}); +console.log("Started: %o", recorder.started()); + +const consumer = recorder.create_consumer(); + +{ + const filter = consumer.create_filter_threshold(.5); + filter.set_margin_frames(10); + /* + filter.set_analyze_filter(value => { + console.log(value); + }) + */ +} + +{ + //const filter = consumer.create_filter_vad(3); + //console.log("Filter name: %s; Filter level: %d; Filter margin: %d", filter.get_name(), filter.get_level(), filter.get_margin_frames()); +} + +{ + const consume = consumer.create_filter_state(); + setTimeout(() => { + console.log("Silence now!"); + consume.set_consuming(true); + + setTimeout(() => { + console.log("Speak now!"); + consume.set_consuming(false); + }, 1000); + }, 1000); +} + +setInterval(() => { + if("gc" in global) { + console.log("GC"); + global.gc(); + } +}, 1000); + +let a_map = [consumer, recorder]; +/* keep the object alive */ +setTimeout(() => { + connection.connected(); + a_map = a_map.filter(e => true); +}, 1000);