Fixed an crash within the audio API and better usage of input devices

This commit is contained in:
WolverinDEV 2020-02-06 23:59:09 +01:00
parent 25173d5745
commit 7007e8f6aa
7 changed files with 322 additions and 120 deletions

View File

@ -9,21 +9,21 @@ extern bool devices_cached(); /* if the result is false then the call to devices
extern std::deque<std::shared_ptr<AudioDevice>> devices();
bool _devices_cached = false;
mutex _audio_devices_lock;
deque<shared_ptr<AudioDevice>> _audio_devices{};
std::mutex _audio_devices_lock;
std::deque<std::shared_ptr<AudioDevice>> _audio_devices{};
bool audio::devices_cached() {
return _devices_cached;
}
void audio::clear_device_cache() {
lock_guard lock(_audio_devices_lock);
std::lock_guard lock(_audio_devices_lock);
_audio_devices.clear();
_devices_cached = false;
}
deque<shared_ptr<AudioDevice>> audio::devices() {
lock_guard lock(_audio_devices_lock);
std::lock_guard lock(_audio_devices_lock);
if(_devices_cached)
return _audio_devices;

View File

@ -1,5 +1,6 @@
#include <cstring>
#include <string>
#include <misc/spin_lock.h>
#include "AudioInput.h"
#include "AudioReframer.h"
#include "../logger.h"
@ -8,6 +9,150 @@ using namespace std;
using namespace tc;
using namespace tc::audio;
class AudioInputSource {
public:
constexpr static auto kChannelCount{2};
constexpr static auto kSampleRate{48000};
explicit AudioInputSource(PaHostApiIndex index) : device_index{index} {}
~AudioInputSource() = default;
/* its blocking! */
void register_consumer(AudioInput* consumer) {
std::lock_guard lock{this->registered_inputs_lock};
if(find(this->registered_inputs.begin(), this->registered_inputs.end(), consumer) != this->registered_inputs.end())
return;
this->registered_inputs.push_back(consumer);
}
/* its blocking */
void remove_consumer(AudioInput* consumer) {
std::lock_guard lock{this->registered_inputs_lock};
auto index = find(this->registered_inputs.begin(), this->registered_inputs.end(), consumer);
if(index == this->registered_inputs.end())
return;
this->registered_inputs.erase(index);
if(!this->registered_inputs.empty())
return;
}
/* this could take a bit longer! */
bool begin_recording(std::string& error) {
std::lock_guard lock{this->state_lock};
if(this->state == RECORDING) return true;
if(this->state != STOPPED) {
if(this->state == DELETED) {
error = "stream has been deleted";
return false;
}
error = "invalid state";
return false;
}
this->current_device = Pa_GetDeviceInfo(this->device_index);
if(!this->current_device) {
error = "failed to get device info";
return false;
}
PaStreamParameters parameters{};
memset(&parameters, 0, sizeof(parameters));
parameters.channelCount = (int) kChannelCount;
parameters.device = this->device_index;
parameters.sampleFormat = paFloat32;
parameters.suggestedLatency = this->current_device->defaultLowOutputLatency;
auto err = Pa_OpenStream(
&this->input_stream,
&parameters,
nullptr,
(double) kSampleRate,
paFramesPerBufferUnspecified,
paClipOff,
&AudioInputSource::pa_audio_callback,
this);
if(err != paNoError) {
this->input_stream = nullptr;
error = to_string(err) + "/" + Pa_GetErrorText(err);
return false;
}
err = Pa_StartStream(this->input_stream);
if(err != paNoError) {
error = "recording failed " + to_string(err) + "/" + Pa_GetErrorText(err);
err = Pa_CloseStream(this->input_stream);
if(err != paNoError)
log_critical(category::audio, tr("Failed to close opened pa stream. This will cause memory leaks. Error: {}/{}"), err, Pa_GetErrorText(err));
return false;
}
this->state = RECORDING;
return true;
}
void stop_recording_if_possible() {
std::lock_guard lock{this->state_lock};
if(this->state != RECORDING) return;
{
std::lock_guard client_lock{this->registered_inputs_lock};
if(!this->registered_inputs.empty()) return;
}
this->state = STOPPED;
if(Pa_IsStreamActive(this->input_stream))
Pa_AbortStream(this->input_stream);
auto error = Pa_CloseStream(this->input_stream);
if(error != paNoError)
log_error(category::audio, tr("Failed to close PA stream: {}"), error);
this->input_stream = nullptr;
}
const PaDeviceIndex device_index;
private:
static int pa_audio_callback(const void *input, void *output, unsigned long frameCount, const PaStreamCallbackTimeInfo* timeInfo, PaStreamCallbackFlags statusFlags, void* _input_source) {
if(!input) return 0; /* this should never happen */
auto input_source = (AudioInputSource*) _input_source;
std::lock_guard lock{input_source->registered_inputs_lock};
for(auto& client : input_source->registered_inputs)
client->audio_callback(input, frameCount, timeInfo, statusFlags);
return 0;
}
std::mutex state_lock{};
enum _state {
STOPPED,
RECORDING,
DELETED
} state{STOPPED};
PaStream* input_stream{nullptr};
const PaDeviceInfo* current_device = nullptr;
std::mutex registered_inputs_lock{};
std::vector<AudioInput*> registered_inputs{};
};
std::mutex input_sources_lock{};
static std::deque<std::shared_ptr<AudioInputSource>> input_sources{};
std::shared_ptr<AudioInputSource> get_input_source(PaDeviceIndex device_index, bool create = true) {
std::lock_guard sources_lock{input_sources_lock};
for(const auto& input : input_sources)
if(input->device_index == device_index)
return input;
if(!create)
return nullptr;
auto input = std::make_shared<AudioInputSource>(device_index);
input_sources.push_back(std::make_shared<AudioInputSource>(device_index));
return input;
}
AudioConsumer::AudioConsumer(tc::audio::AudioInput *handle, size_t channel_count, size_t sample_rate, size_t frame_size) :
handle(handle),
channel_count(channel_count),
@ -44,91 +189,50 @@ AudioInput::~AudioInput() {
consumer->handle = nullptr;
}
bool AudioInput::open_device(std::string& error, PaDeviceIndex index) {
lock_guard lock(this->input_stream_lock);
PaDeviceIndex AudioInput::current_device() {
lock_guard lock(this->input_source_lock);
return this->input_source ? this->input_source->device_index : paNoDevice;
}
if(index == this->_current_device_index)
bool AudioInput::open_device(std::string& error, PaDeviceIndex index) {
lock_guard lock(this->input_source_lock);
if(index == (this->input_source ? this->input_source->device_index : paNoDevice))
return true;
this->close_device();
this->_current_device_index = index;
if(index == paNoDevice)
return true;
this->_current_device = Pa_GetDeviceInfo(index);
if(!this->_current_device) {
this->_current_device_index = paNoDevice;
error = "failed to get device info";
return false;
}
PaStreamParameters parameters{};
memset(&parameters, 0, sizeof(parameters));
parameters.channelCount = (int) this->_channel_count;
parameters.device = this->_current_device_index;
parameters.sampleFormat = paFloat32;
parameters.suggestedLatency = this->_current_device->defaultLowOutputLatency;
auto err = Pa_OpenStream(
&this->input_stream,
&parameters,
nullptr,
(double) this->_sample_rate,
paFramesPerBufferUnspecified,
paClipOff,
&AudioInput::_audio_callback,
this);
if(err != paNoError) {
this->input_stream = nullptr;
error = to_string(err) + "/" + Pa_GetErrorText(err);
return false;
}
return true;
}
bool AudioInput::record() {
lock_guard lock(this->input_stream_lock);
if(!this->input_stream)
return false;
if(Pa_IsStreamActive(this->input_stream))
return true;
auto err = Pa_StartStream(this->input_stream);
if(err != paNoError && err != paStreamIsNotStopped) {
log_error(category::audio, tr("Pa_StartStream returned {}"), err);
return false;
}
return true;
}
bool AudioInput::recording() {
lock_guard lock(this->input_stream_lock);
return this->input_stream && Pa_IsStreamActive(this->input_stream);
}
void AudioInput::stop() {
lock_guard lock(this->input_stream_lock);
if(this->input_stream) {
if(Pa_IsStreamActive(this->input_stream))
Pa_AbortStream(this->input_stream);
}
this->input_source = get_input_source(index, true);
this->input_source->register_consumer(this);
return this->input_source->begin_recording(error);
}
void AudioInput::close_device() {
lock_guard lock(this->input_stream_lock);
if(this->input_stream) {
if(Pa_IsStreamActive(this->input_stream))
Pa_AbortStream(this->input_stream);
lock_guard lock(this->input_source_lock);
if(this->input_source) {
this->input_source->remove_consumer(this);
this->input_source->stop_recording_if_possible();
this->input_source.reset();
}
this->input_recording = false;
}
auto error = Pa_CloseStream(this->input_stream);
if(error != paNoError)
log_error(category::audio, tr("Failed to close PA stream: {}"), error);
this->input_stream = nullptr;
this_thread::sleep_for(chrono::seconds{1});
}
bool AudioInput::record() {
lock_guard lock(this->input_source_lock);
if(!this->input_source) return false;
this->input_recording = true;
return true;
}
bool AudioInput::recording() {
return this->input_recording;
}
void AudioInput::stop() {
this->input_recording = false;
}
std::shared_ptr<AudioConsumer> AudioInput::create_consumer(size_t frame_length) {
@ -151,15 +255,10 @@ void AudioInput::delete_consumer(const std::shared_ptr<AudioConsumer> &source) {
source->handle = nullptr;
}
int AudioInput::_audio_callback(const void *a, void *b, unsigned long c, const PaStreamCallbackTimeInfo* d, PaStreamCallbackFlags e, void *_ptr_audio_output) {
return reinterpret_cast<AudioInput*>(_ptr_audio_output)->audio_callback(a, b, c, d, e);
}
void AudioInput::audio_callback(const void *input, unsigned long frameCount, const PaStreamCallbackTimeInfo* timeInfo, PaStreamCallbackFlags statusFlags) {
if(!this->input_recording) return;
int AudioInput::audio_callback(const void *input, void *output, unsigned long frameCount, const PaStreamCallbackTimeInfo* timeInfo, PaStreamCallbackFlags statusFlags) {
if (!input) /* hmmm.. suspicious */
return 0;
if(this->_volume != 1) {
if(this->_volume != 1 && false) {
auto ptr = (float*) input;
auto left = frameCount * this->_channel_count;
while(left-- > 0)
@ -175,5 +274,4 @@ int AudioInput::audio_callback(const void *input, void *output, unsigned long fr
if(ms > 5) {
log_warn(category::audio, tr("Processing of audio input needed {}ms. This could be an issue!"), chrono::duration_cast<chrono::milliseconds>(end - begin).count());
}
return 0;
}

View File

@ -9,6 +9,7 @@
#include <misc/spin_lock.h>
#include "AudioSamples.h"
class AudioInputSource;
namespace tc {
namespace audio {
class AudioInput;
@ -39,16 +40,18 @@ namespace tc {
};
class AudioInput {
friend class ::AudioInputSource;
public:
AudioInput(size_t /* channels */, size_t /* rate */);
virtual ~AudioInput();
bool open_device(std::string& /* error */, PaDeviceIndex);
bool record();
bool recording();
[[nodiscard]] bool open_device(std::string& /* error */, PaDeviceIndex);
[[nodiscard]] PaDeviceIndex current_device();
void close_device();
[[nodiscard]] bool record();
[[nodiscard]] bool recording();
void stop();
void close_device();
PaDeviceIndex current_device() { return this->_current_device_index; }
std::deque<std::shared_ptr<AudioConsumer>> consumers() {
std::lock_guard lock(this->consumers_lock);
@ -64,8 +67,7 @@ namespace tc {
inline float volume() { return this->_volume; }
inline void set_volume(float value) { this->_volume = value; }
private:
static int _audio_callback(const void *, void *, unsigned long, const PaStreamCallbackTimeInfo*, PaStreamCallbackFlags, void*);
int audio_callback(const void *, void *, unsigned long, const PaStreamCallbackTimeInfo*, PaStreamCallbackFlags);
void audio_callback(const void *, unsigned long, const PaStreamCallbackTimeInfo*, PaStreamCallbackFlags);
size_t const _channel_count;
size_t const _sample_rate;
@ -73,10 +75,9 @@ namespace tc {
std::mutex consumers_lock;
std::deque<std::shared_ptr<AudioConsumer>> _consumers;
std::recursive_mutex input_stream_lock;
const PaDeviceInfo* _current_device = nullptr;
PaDeviceIndex _current_device_index = paNoDevice;
PaStream* input_stream = nullptr;
std::recursive_mutex input_source_lock;
bool input_recording{false};
std::shared_ptr<::AudioInputSource> input_source{};
float _volume = 1.f;
};

View File

@ -7,7 +7,7 @@ using namespace std;
using namespace tc::audio;
/* technique based on http://www.vttoth.com/CMS/index.php/technical-notes/68 */
inline float merge_ab(float a, float b) {
inline constexpr float merge_ab(float a, float b) {
/*
* Form: A,B := [0;n]
* Z = 2(A + B) - (2/n) * A * B - n
@ -21,6 +21,10 @@ inline float merge_ab(float a, float b) {
return (2 * (a + b) - a * b - 2) - 1;
}
static_assert(merge_ab(1, 0) == 1);
static_assert(merge_ab(0, 1) == 1);
static_assert(merge_ab(1, 1) == 1);
bool merge::merge_sources(void *_dest, void *_src_a, void *_src_b, size_t channels, size_t samples) {
auto dest = (float*) _dest;
auto src_a = (float*) _src_a;

View File

@ -172,26 +172,11 @@ NAN_METHOD(AudioRecorderWrapper::_start) {
return;
}
unique_ptr<Nan::Persistent<v8::Function>> _callback = make_unique<Nan::Persistent<v8::Function>>(info[0].As<v8::Function>());
unique_ptr<Nan::Persistent<v8::Object>> _recorder = make_unique<Nan::Persistent<v8::Object>>(info.Holder());
auto input = ObjectWrap::Unwrap<AudioRecorderWrapper>(info.Holder())->_input;
auto _async_callback = Nan::async_callback([call = std::move(_callback), recorder = move(_recorder)](bool result) mutable {
Nan::HandleScope scope;
auto callback_function = call->Get(Nan::GetCurrentContext()->GetIsolate());
v8::Local<v8::Value> argv[1];
argv[0] = v8::Boolean::New(Nan::GetCurrentContext()->GetIsolate(), result);
callback_function->Call(Nan::GetCurrentContext(), Nan::Undefined(), 1, argv);
recorder->Reset();
call->Reset();
}).option_destroyed_execute(true);
auto handle = ObjectWrap::Unwrap<AudioRecorderWrapper>(info.Holder());
auto input = handle->_input;
std::thread([_async_callback, input]{
_async_callback(input->record());
}).detach();
v8::Local<v8::Value> argv[1];
argv[0] = v8::Boolean::New(Nan::GetCurrentContext()->GetIsolate(), input->record());
(void) info[0].As<v8::Function>()->Call(Nan::GetCurrentContext(), Nan::Undefined(), 1, argv);
}
NAN_METHOD(AudioRecorderWrapper::_started) {

View File

@ -159,11 +159,11 @@ void TransferObjectWrap::do_wrap(v8::Local<v8::Object> object) {
auto direction = source ? "upload" : "download";
Nan::Set(object,
Nan::New<v8::String>("direction").ToLocalChecked(),
v8::String::NewFromUtf8(Nan::GetCurrentContext()->GetIsolate(), direction).ToLocalChecked()
Nan::New<v8::String>(direction).ToLocalChecked()
);
Nan::Set(object,
Nan::New<v8::String>("name").ToLocalChecked(),
v8::String::NewFromUtf8(Nan::GetCurrentContext()->GetIsolate(), this->target()->name().c_str()).ToLocalChecked()
Nan::New<v8::String>(this->target()->name().c_str()).ToLocalChecked()
);
if(source) {

View File

@ -0,0 +1,114 @@
/// <reference path="../../exports/exports.d.ts" />
console.log("HELLO WORLD");
module.paths.push("../../build/linux_x64");
module.paths.push("../../build/win32_64");
//LD_PRELOAD=/usr/lib/x86_64-linux-gnu/libasan.so.5
const os = require('os');
//process.dlopen(module, '/usr/lib/x86_64-linux-gnu/libasan.so.5',
// os.constants.dlopen.RTLD_NOW);
import * as fs from "fs";
const original_require = require;
require = (module => original_require(__dirname + "/../../../build/linux_x64/" + module + ".node")) as any;
import * as handle from "teaclient_connection";
require = original_require;
const connection_list = [];
const connection = handle.spawn_server_connection();
const client_list = [];
console.dir(handle);
console.log("Query devices...");
console.log("Devices: %o", handle.audio.available_devices());
console.log("Current playback device: %o", handle.audio.playback.current_device());
//handle.audio.playback.set_device(14);
//console.log("Current playback device: %o", handle.audio.playback.current_device());
const stream = handle.audio.playback.create_stream();
console.log("Own stream: %o", stream);
for(let i = 0; i < 12; i++) {
const recorder = handle.audio.record.create_recorder();
for(const device of handle.audio.available_devices()) {
if(!device.input_supported)
continue;
if(device.name != "pulse")
continue;
console.log("Found pulse at %o", device.device_index);
recorder.set_device(device.device_index, () => {
recorder.start(flag => console.log("X: " + flag));
const consumer = recorder.create_consumer();
consumer.create_filter_threshold(2);
});
break;
}
}
/* -1 => default device */
const recorder = handle.audio.record.create_recorder();
console.log("Have device: %o", recorder);
console.log("Device: %o", recorder.get_device());
if(recorder.get_device() == -1) {
console.log("Looking for devices");
for(const device of handle.audio.available_devices()) {
if(!device.input_supported)
continue;
if(device.name != "pulse")
continue;
console.log("Found pulse at %o", device.device_index);
recorder.set_device(device.device_index, () => {});
}
}
console.log("Device: %o", recorder.get_device());
recorder.start(() => {});
console.log("Started: %o", recorder.started());
const consumer = recorder.create_consumer();
{
const filter = consumer.create_filter_threshold(.5);
filter.set_margin_frames(10);
/*
filter.set_analyze_filter(value => {
console.log(value);
})
*/
}
{
//const filter = consumer.create_filter_vad(3);
//console.log("Filter name: %s; Filter level: %d; Filter margin: %d", filter.get_name(), filter.get_level(), filter.get_margin_frames());
}
{
const consume = consumer.create_filter_state();
setTimeout(() => {
console.log("Silence now!");
consume.set_consuming(true);
setTimeout(() => {
console.log("Speak now!");
consume.set_consuming(false);
}, 1000);
}, 1000);
}
setInterval(() => {
if("gc" in global) {
console.log("GC");
global.gc();
}
}, 1000);
let a_map = [consumer, recorder];
/* keep the object alive */
setTimeout(() => {
connection.connected();
a_map = a_map.filter(e => true);
}, 1000);