#include "./AudioConsumer.h" #include "./AudioRecorder.h" #include "./AudioFilter.h" #include "../AudioInput.h" #include "../filter/Filter.h" #include "../filter/FilterVad.h" #include "../filter/FilterThreshold.h" #include "../filter/FilterState.h" #include "../../logger.h" #include /* Must be last */ using namespace std; using namespace tc::audio; using namespace tc::audio::recorder; inline v8::PropertyAttribute operator|(const v8::PropertyAttribute& a, const v8::PropertyAttribute& b) { return (v8::PropertyAttribute) ((unsigned) a | (unsigned) b); } NAN_MODULE_INIT(AudioConsumerWrapper::Init) { auto klass = Nan::New(AudioConsumerWrapper::NewInstance); klass->SetClassName(Nan::New("AudioConsumer").ToLocalChecked()); klass->InstanceTemplate()->SetInternalFieldCount(1); Nan::SetPrototypeMethod(klass, "get_filters", AudioConsumerWrapper::_get_filters); Nan::SetPrototypeMethod(klass, "unregister_filter", AudioConsumerWrapper::_unregister_filter); Nan::SetPrototypeMethod(klass, "create_filter_vad", AudioConsumerWrapper::_create_filter_vad); Nan::SetPrototypeMethod(klass, "create_filter_threshold", AudioConsumerWrapper::_create_filter_threshold); Nan::SetPrototypeMethod(klass, "create_filter_state", AudioConsumerWrapper::_create_filter_state); Nan::SetPrototypeMethod(klass, "get_filter_mode", AudioConsumerWrapper::_get_filter_mode); Nan::SetPrototypeMethod(klass, "set_filter_mode", AudioConsumerWrapper::_set_filter_mode); Nan::SetPrototypeMethod(klass, "rnnoise_enabled", AudioConsumerWrapper::rnnoise_enabled); Nan::SetPrototypeMethod(klass, "toggle_rnnoise", AudioConsumerWrapper::toggle_rnnoise); constructor_template().Reset(klass); constructor().Reset(Nan::GetFunction(klass).ToLocalChecked()); } NAN_METHOD(AudioConsumerWrapper::NewInstance) { if(!info.IsConstructCall()) Nan::ThrowError("invalid invoke!"); } AudioConsumerWrapper::AudioConsumerWrapper(AudioRecorderWrapper* h, const std::shared_ptr &handle) : _handle(handle), _recorder(h) { log_allocate("AudioConsumerWrapper", this); { lock_guard read_lock{handle->on_read_lock}; handle->on_read = [&](const void* buffer, size_t length){ this->process_data(buffer, length); }; } #ifdef DO_DEADLOCK_REF this->_recorder->js_ref(); /* FML Mem leak! (In general the consumer live is related to the recorder handle, but for nodejs testing we want to keep this reference ) */ #endif } AudioConsumerWrapper::~AudioConsumerWrapper() { log_free("AudioConsumerWrapper", this); lock_guard lock{this->execute_mutex}; this->unbind(); if(this->_handle->handle) { this->_handle->handle->delete_consumer(this->_handle); this->_handle = nullptr; } for(auto& instance : this->rnnoise_processor) { if(!instance) { continue; } rnnoise_destroy((DenoiseState*) instance); instance = nullptr; } for(auto index{0}; index < kInternalFrameBufferCount; index++) { if(!this->internal_frame_buffer[index]) { continue; } free(this->internal_frame_buffer[index]); this->internal_frame_buffer[index] = nullptr; this->internal_frame_buffer_size[index] = 0; } #ifdef DO_DEADLOCK_REF if(this->_recorder) { this->_recorder->js_unref(); } #endif } void AudioConsumerWrapper::do_wrap(const v8::Local &obj) { this->Wrap(obj); this->_call_data = Nan::async_callback([&] { Nan::HandleScope scope; auto handle = this->handle(); v8::Local callback_function = Nan::Get(handle, Nan::New("callback_data").ToLocalChecked()).FromMaybe(v8::Local{}); if(callback_function.IsEmpty() || callback_function->IsNullOrUndefined() || !callback_function->IsFunction()) { lock_guard lock(this->_data_lock); this->_data_entries.clear(); } std::unique_ptr buffer; while(true) { { lock_guard lock(this->_data_lock); if(this->_data_entries.empty()) break; buffer = move(this->_data_entries.front()); this->_data_entries.pop_front(); } const auto byte_length = buffer->sample_count * this->_handle->channel_count * 4; auto js_buffer = v8::ArrayBuffer::New(Nan::GetCurrentContext()->GetIsolate(), byte_length); auto js_fbuffer = v8::Float32Array::New(js_buffer, 0, byte_length / 4); memcpy(js_buffer->GetContents().Data(), buffer->buffer, byte_length); v8::Local argv[1]; argv[0] = js_fbuffer; (void) callback_function.As()->Call(Nan::GetCurrentContext(), Nan::Undefined(), 1, argv); } }); this->_call_ended = Nan::async_callback([&]{ Nan::HandleScope scope; auto handle = this->handle(); v8::Local callback_function = Nan::Get(handle, Nan::New("callback_ended").ToLocalChecked()).FromMaybe(v8::Local{}); if(callback_function.IsEmpty() || callback_function->IsNullOrUndefined() || !callback_function->IsFunction()) return; (void) callback_function.As()->Call(Nan::GetCurrentContext(), Nan::Undefined(), 0, nullptr); }); this->_call_started = Nan::async_callback([&]{ Nan::HandleScope scope; auto handle = this->handle(); v8::Local callback_function = Nan::Get(handle, Nan::New("callback_started").ToLocalChecked()).FromMaybe(v8::Local{}); if(callback_function.IsEmpty() || callback_function->IsNullOrUndefined() || !callback_function->IsFunction()) return; (void) callback_function.As()->Call(Nan::GetCurrentContext(), Nan::Undefined(), 0, nullptr); }); Nan::DefineOwnProperty(this->handle(), Nan::New("frameSize").ToLocalChecked(), Nan::New((uint32_t) this->_handle->frame_size), v8::ReadOnly | v8::DontDelete); Nan::DefineOwnProperty(this->handle(), Nan::New("sampleRate").ToLocalChecked(), Nan::New((uint32_t) this->_handle->sample_rate), v8::ReadOnly | v8::DontDelete); Nan::DefineOwnProperty(this->handle(), Nan::New("channelCount").ToLocalChecked(), Nan::New((uint32_t) this->_handle->channel_count), v8::ReadOnly | v8::DontDelete); } void AudioConsumerWrapper::unbind() { if(this->_handle) { lock_guard lock{this->_handle->on_read_lock}; this->_handle->on_read = nullptr; } } static const float kRnNoiseScale = -INT16_MIN; void AudioConsumerWrapper::process_data(const void *buffer, size_t samples) { if(samples != 960) { logger::error(logger::category::audio, tr("Received audio frame with invalid sample count (Expected 960, Received {})"), samples); return; } lock_guard lock{this->execute_mutex}; if(this->filter_mode_ == FilterMode::BLOCK) { return; } /* apply input modifiers */ if(this->rnnoise) { /* TODO: don't call reserve_internal_buffer every time and assume the buffers are initialized */ /* TODO: Maybe find out if the microphone is some kind of pseudo stero so we can handle it as mono? */ if(this->_handle->channel_count > 1) { auto channel_count = this->_handle->channel_count; this->reserve_internal_buffer(0, samples * channel_count * sizeof(float)); this->reserve_internal_buffer(1, samples * channel_count * sizeof(float)); for(size_t channel{0}; channel < channel_count; channel++) { auto target_buffer = (float*) this->internal_frame_buffer[1]; auto source_buffer = (const float*) buffer + channel; for(size_t index{0}; index < samples; index++) { *target_buffer = *source_buffer * kRnNoiseScale; source_buffer += channel_count; target_buffer++; } /* rnnoise uses a frame size of 480 */ this->initialize_rnnoise(channel); rnnoise_process_frame((DenoiseState*) this->rnnoise_processor[channel], (float*) this->internal_frame_buffer[0] + channel * samples, (const float*) this->internal_frame_buffer[1]); rnnoise_process_frame((DenoiseState*) this->rnnoise_processor[channel], (float*) this->internal_frame_buffer[0] + channel * samples + 480, (const float*) this->internal_frame_buffer[1] + 480); } const float* channel_buffer_ptr[kMaxChannelCount]; for(size_t channel{0}; channel < channel_count; channel++) { channel_buffer_ptr[channel] = (const float*) this->internal_frame_buffer[0] + channel * samples; } /* now back again to interlanced */ auto target_buffer = (float*) this->internal_frame_buffer[1]; for(size_t index{0}; index < samples; index++) { for(size_t channel{0}; channel < channel_count; channel++) { *target_buffer = *(channel_buffer_ptr[channel]++) / kRnNoiseScale; target_buffer++; } } buffer = this->internal_frame_buffer[1]; } else { /* rnnoise uses a frame size of 480 */ this->reserve_internal_buffer(0, samples * sizeof(float)); auto target_buffer = (float*) this->internal_frame_buffer[0]; for(size_t index{0}; index < samples; index++) { target_buffer[index] = ((float*) buffer)[index] * kRnNoiseScale; } this->initialize_rnnoise(0); rnnoise_process_frame((DenoiseState*) this->rnnoise_processor[0], target_buffer, target_buffer); rnnoise_process_frame((DenoiseState*) this->rnnoise_processor[0], &target_buffer[480], &target_buffer[480]); buffer = target_buffer; for(size_t index{0}; index < samples; index++) { target_buffer[index] /= kRnNoiseScale; } } } bool should_process{true}; if(this->filter_mode_ == FilterMode::FILTER) { auto filters = this->filters(); for(const auto& filter : filters) { auto _filter = filter->filter(); if(!_filter) continue; if(_filter->frame_size() != samples) { cerr << "Tried to use a filter, but frame size does not match!" << endl; continue; } if(!_filter->process(buffer)) { should_process = false; break; } } } else if(this->filter_mode_ != FilterMode::BYPASS) { should_process = false; } if(!should_process) { if(!this->last_consumed) { this->last_consumed = true; this->_call_ended(); unique_lock native_read_lock(this->native_read_callback_lock); if(this->native_read_callback) { auto callback = this->native_read_callback; /* copy */ native_read_lock.unlock(); callback(nullptr, 0); /* notify end */ } } return; } if(this->last_consumed) { this->_call_started(); } this->last_consumed = false; { unique_lock native_read_lock(this->native_read_callback_lock); if(this->native_read_callback) { auto callback = this->native_read_callback; /* copy */ native_read_lock.unlock(); callback(buffer, samples); return; } } auto byte_length = samples * this->_handle->channel_count * 4; auto buf = make_unique(); buf->buffer = malloc(byte_length); memcpy(buf->buffer, buffer, byte_length); buf->sample_count = samples; { lock_guard data_lock{this->_data_lock}; this->_data_entries.push_back(move(buf)); } this->_call_data(); } std::shared_ptr AudioConsumerWrapper::create_filter(const std::string& name, const std::shared_ptr &impl) { auto result = shared_ptr(new AudioFilterWrapper(name, impl), [](AudioFilterWrapper* ptr) { assert(v8::Isolate::GetCurrent()); ptr->Unref(); }); /* wrap into object */ { auto js_object = Nan::NewInstance(Nan::New(AudioFilterWrapper::constructor()), 0, nullptr).ToLocalChecked(); result->do_wrap(js_object); result->Ref(); } { lock_guard lock(this->filter_mutex_); this->filter_.push_back(result); } return result; } void AudioConsumerWrapper::delete_filter(const AudioFilterWrapper* filter) { shared_ptr handle; /* need to keep the handle 'till everything has been finished */ { lock_guard lock(this->filter_mutex_); for(auto& c : this->filter_) { if(&*c == filter) { handle = c; break; } } if(!handle) { return; } { auto it = find(this->filter_.begin(), this->filter_.end(), handle); if(it != this->filter_.end()) { this->filter_.erase(it); } } } { lock_guard lock(this->execute_mutex); /* ensure that the filter isn't used right now */ handle->_filter = nullptr; } } void AudioConsumerWrapper::reserve_internal_buffer(int index, size_t target) { assert(index < kInternalFrameBufferCount); if(this->internal_frame_buffer_size[index] < target) { if(this->internal_frame_buffer_size[index]) { ::free(this->internal_frame_buffer[index]); } this->internal_frame_buffer[index] = malloc(target); this->internal_frame_buffer_size[index] = target; } } void AudioConsumerWrapper::initialize_rnnoise(int channel) { if(!this->rnnoise_processor[channel]) { this->rnnoise_processor[channel] = rnnoise_create(nullptr); } } NAN_METHOD(AudioConsumerWrapper::_get_filters) { auto handle = ObjectWrap::Unwrap(info.Holder()); auto filters = handle->filters(); auto result = Nan::New((uint32_t) filters.size()); for(uint32_t index = 0; index < filters.size(); index++) Nan::Set(result, index, filters[index]->handle()); info.GetReturnValue().Set(result); } NAN_METHOD(AudioConsumerWrapper::_unregister_filter) { auto handle = ObjectWrap::Unwrap(info.Holder()); if(info.Length() != 1 || !info[0]->IsObject()) { Nan::ThrowError("invalid argument"); return; } if(!Nan::New(AudioFilterWrapper::constructor_template())->HasInstance(info[0])) { Nan::ThrowError("invalid consumer"); return; } auto consumer = ObjectWrap::Unwrap(info[0]->ToObject(Nan::GetCurrentContext()).ToLocalChecked()); handle->delete_filter(consumer); } NAN_METHOD(AudioConsumerWrapper::_create_filter_vad) { auto handle = ObjectWrap::Unwrap(info.Holder()); auto consumer = handle->_handle; assert(consumer); /* should never be null! */ if(info.Length() != 1 || !info[0]->IsNumber()) { Nan::ThrowError("invalid argument"); return; } string error; auto filter = make_shared(consumer->channel_count,consumer->sample_rate,consumer->frame_size); if(!filter->initialize(error, info[0]->Int32Value(Nan::GetCurrentContext()).FromMaybe(0), 2)) { Nan::ThrowError(Nan::New("failed to initialize filter (" + error + ")").ToLocalChecked()); return; } auto object = handle->create_filter("vad", filter); info.GetReturnValue().Set(object->handle()); } NAN_METHOD(AudioConsumerWrapper::_create_filter_threshold) { auto handle = ObjectWrap::Unwrap(info.Holder()); auto consumer = handle->_handle; assert(consumer); /* should never be null! */ if(info.Length() != 1 || !info[0]->IsNumber()) { Nan::ThrowError("invalid argument"); return; } string error; auto filter = make_shared(consumer->channel_count,consumer->sample_rate,consumer->frame_size); if(!filter->initialize(error, (float) info[0]->Int32Value(Nan::GetCurrentContext()).FromMaybe(0), 2)) { Nan::ThrowError(Nan::New("failed to initialize filter (" + error + ")").ToLocalChecked()); return; } auto object = handle->create_filter("threshold", filter); info.GetReturnValue().Set(object->handle()); } NAN_METHOD(AudioConsumerWrapper::_create_filter_state) { auto handle = ObjectWrap::Unwrap(info.Holder()); auto consumer = handle->_handle; assert(consumer); /* should never be null! */ string error; auto filter = make_shared(consumer->channel_count, consumer->sample_rate, consumer->frame_size); if(!filter->initialize(error)) { Nan::ThrowError(Nan::New("failed to initialize filter (" + error + ")").ToLocalChecked()); return; } auto object = handle->create_filter("state", filter); info.GetReturnValue().Set(object->handle()); } NAN_METHOD(AudioConsumerWrapper::_get_filter_mode) { auto handle = ObjectWrap::Unwrap(info.Holder()); info.GetReturnValue().Set((int) handle->filter_mode_); } NAN_METHOD(AudioConsumerWrapper::_set_filter_mode) { auto handle = ObjectWrap::Unwrap(info.Holder()); if(info.Length() != 1 || !info[0]->IsNumber()) { Nan::ThrowError("invalid argument"); return; } auto value = info[0].As()->Int32Value(info.GetIsolate()->GetCurrentContext()).FromMaybe(0); handle->filter_mode_ = (FilterMode) value; } NAN_METHOD(AudioConsumerWrapper::rnnoise_enabled) { auto handle = ObjectWrap::Unwrap(info.Holder()); info.GetReturnValue().Set(handle->rnnoise); } NAN_METHOD(AudioConsumerWrapper::toggle_rnnoise) { auto handle = ObjectWrap::Unwrap(info.Holder()); if(info.Length() != 1 || !info[0]->IsBoolean()) { Nan::ThrowError("invalid argument"); return; } handle->rnnoise = info[0]->BooleanValue(info.GetIsolate()); }