#include #include #include #include "./AudioInput.h" #include "./AudioReframer.h" #include "./AudioResampler.h" #include "./AudioMerger.h" #include "./AudioGain.h" #include "./AudioInterleaved.h" #include "./AudioOutput.h" #include "./processing/AudioProcessor.h" #include "../logger.h" #include "AudioEventLoop.h" using namespace std; using namespace tc; using namespace tc::audio; AudioConsumer::AudioConsumer(size_t channel_count, size_t sample_rate, size_t frame_size) : channel_count{channel_count}, sample_rate{sample_rate}, frame_size{frame_size} { if(this->frame_size > 0) { this->reframer = std::make_unique(channel_count, frame_size); this->reframer->on_frame = [&](const void* buffer) { this->handle_framed_data(buffer, this->frame_size); }; } } void AudioConsumer::handle_framed_data(const void *buffer, size_t samples) { std::unique_lock read_callback_lock(this->on_read_lock); auto function = this->on_read; /* copy */ read_callback_lock.unlock(); if(!function) return; function(buffer, samples); } void AudioConsumer::process_data(const void *buffer, size_t samples) { if(this->reframer) { this->reframer->process(buffer, samples); } else { this->handle_framed_data(buffer, samples); } } extern tc::audio::AudioOutput* global_audio_output; AudioInput::AudioInput(size_t channels, size_t sample_rate) : channel_count_{channels}, sample_rate_{sample_rate}, input_buffer{(sample_rate * channels * sizeof(float) * kInputBufferCapacityMs) / 1000} { this->event_loop_entry = std::make_shared(this); { this->initialize_hook_handle = std::make_shared(); this->initialize_hook_handle->input = this; std::weak_ptr weak_handle{this->initialize_hook_handle}; audio::initialize([weak_handle] { auto handle = weak_handle.lock(); if(!handle) { return; } std::lock_guard lock{handle->mutex}; if(!handle->input) { return; } auto processor = std::make_shared(); if(!processor->initialize()) { log_error(category::audio, tr("Failed to initialize audio processor.")); return; } global_audio_output->register_audio_processor(processor); handle->input->audio_processor_ = processor; }); } } AudioInput::~AudioInput() { { std::lock_guard lock{this->initialize_hook_handle->mutex}; this->initialize_hook_handle->input = nullptr; } { audio::encode_event_loop->cancel(this->event_loop_entry); this->event_loop_entry->execute_lock(true); } if(this->audio_processor_) { assert(global_audio_output); global_audio_output->unregister_audio_processor(this->audio_processor_); this->audio_processor_ = nullptr; } this->close_device(); } void AudioInput::set_device(const std::shared_ptr &device) { std::lock_guard lock{this->input_source_lock}; if(device == this->input_device) { return; } this->close_device(); this->input_device = device; } void AudioInput::close_device() { std::lock_guard lock{this->input_source_lock}; if(this->input_recorder) { this->input_recorder->remove_consumer(this); this->input_recorder->stop_if_possible(); this->input_recorder.reset(); } this->resampler_ = nullptr; this->input_device = nullptr; } bool AudioInput::record(std::string& error) { std::lock_guard lock{this->input_source_lock}; if(!this->input_device) { error = "no device"; return false; } if(this->input_recorder) { return true; } this->input_recorder = this->input_device->record(); if(!this->input_recorder) { error = "failed to get recorder"; return false; } if(this->input_recorder->sample_rate() != this->sample_rate()) { this->resampler_ = std::make_unique(this->input_recorder->sample_rate(), this->sample_rate(), this->channel_count_); } this->input_recorder->register_consumer(this); if(!this->input_recorder->start(error)) { this->input_recorder->remove_consumer(this); this->input_recorder.reset(); return false; } return true; } bool AudioInput::recording() { return this->input_recorder != nullptr; } void AudioInput::stop() { if(!this->input_recorder) return; this->input_recorder->remove_consumer(this); this->input_recorder->stop_if_possible(); this->input_recorder.reset(); } std::vector> AudioInput::consumers() { std::vector> result{}; result.reserve(10); std::lock_guard consumer_lock{this->consumers_mutex}; result.reserve(this->consumers_.size()); this->consumers_.erase(std::remove_if(this->consumers_.begin(), this->consumers_.end(), [&](const std::weak_ptr& weak_consumer) { auto consumer = weak_consumer.lock(); if(!consumer) { return true; } result.push_back(consumer); return false; }), this->consumers_.end()); return result; } std::shared_ptr AudioInput::create_consumer(size_t frame_length) { auto result = std::shared_ptr(new AudioConsumer(this->channel_count_, this->sample_rate_, frame_length)); { std::lock_guard lock(this->consumers_mutex); this->consumers_.push_back(result); } return result; } void AudioInput::allocate_input_buffer_samples(size_t samples) { const auto expected_byte_size = samples * this->channel_count_ * sizeof(float); if(expected_byte_size > this->input_buffer.capacity()) { log_critical(category::audio, tr("Resampled audio input data would be larger than our input buffer capacity.")); return; } if(this->input_buffer.free_count() < expected_byte_size) { log_warn(category::audio, tr("Audio input buffer overflow.")); const auto free_samples = this->input_buffer.free_count() / this->channel_count_ / sizeof(float); assert(samples >= free_samples); const auto missing_samples = samples - free_samples; this->input_buffer.advance_read_ptr(missing_samples * this->channel_count_ * sizeof(float)); } } void AudioInput::consume(const void *input, size_t sample_count, size_t channels) { constexpr static auto kTempBufferMaxSampleCount{1024 * 8}; float temp_buffer[kTempBufferMaxSampleCount]; /* TODO: Short circuit for silence here */ if(channels != this->channel_count_) { if(channels < 1 || channels > 2) { log_critical(category::audio, tr("AudioInput received audio data with an unsupported channel count of {}."), channels); return; } if(sample_count * this->channel_count_ > kTempBufferMaxSampleCount) { log_critical(category::audio, tr("Received audio chunk bigger than our temp stack buffer. Received {} samples but can hold only {}."), sample_count, kTempBufferMaxSampleCount / this->channel_count_); return; } audio::merge::merge_channels_interleaved(temp_buffer, this->channel_count_, input, channels, sample_count); input = temp_buffer; } if(this->resampler_) { const auto expected_size = this->resampler_->estimated_output_size(sample_count); this->allocate_input_buffer_samples(expected_size); size_t resampled_sample_count{expected_size}; if(!this->resampler_->process(this->input_buffer.write_ptr(), input, sample_count, resampled_sample_count)) { log_error(category::audio, tr("Failed to resample input audio.")); return; } this->input_buffer.advance_write_ptr(resampled_sample_count * this->channel_count_ * sizeof(float)); } else { this->allocate_input_buffer_samples(sample_count); const auto sample_byte_size = sample_count * this->channel_count_ * sizeof(float); memcpy(this->input_buffer.write_ptr(), input, sample_byte_size); this->input_buffer.advance_write_ptr(sample_byte_size); } audio::encode_event_loop->schedule(this->event_loop_entry); } void AudioInput::process_audio() { const auto chunk_sample_count = (kChunkSizeMs * this->sample_rate_) / 1000; while(true) { auto available_sample_count = this->input_buffer.fill_count() / this->channel_count_ / sizeof(float); if(available_sample_count < chunk_sample_count) { break; } auto input = this->input_buffer.read_ptr(); /* * It's save to mutate the current memory. * If overflows occur it could lead to wired artifacts but all memory access is save. */ this->process_audio_chunk((void*) input); this->input_buffer.advance_read_ptr(chunk_sample_count * this->channel_count_ * sizeof(float)); } } void AudioInput::process_audio_chunk(void *chunk) { constexpr static auto kTempSampleBufferSize{1024 * 8}; constexpr static auto kMaxChannelCount{32}; const auto chunk_sample_count = this->chunk_sample_count(); float temp_sample_buffer[kTempSampleBufferSize]; float out_sample_buffer[kTempSampleBufferSize]; assert(memset(temp_sample_buffer, 0, sizeof(float) * kTempSampleBufferSize)); assert(memset(out_sample_buffer, 0, sizeof(float) * kTempSampleBufferSize)); if(auto processor{this->audio_processor_}; processor) { assert(kTempSampleBufferSize >= chunk_sample_count * this->channel_count_ * sizeof(float)); audio::deinterleave(temp_sample_buffer, (const float*) chunk, this->channel_count_, chunk_sample_count); webrtc::StreamConfig stream_config{(int) this->sample_rate_, this->channel_count_}; float* channel_ptr[kMaxChannelCount]; assert(memset(channel_ptr, 0, sizeof(float*) * kMaxChannelCount)); for(size_t channel{0}; channel < this->channel_count_; channel++) { channel_ptr[channel] = temp_sample_buffer + (channel * chunk_sample_count); } auto process_statistics = processor->process_stream(stream_config, channel_ptr); if(!process_statistics.has_value()) { /* TODO: Some kind of error message? */ return; } audio::interleave_vec(out_sample_buffer, channel_ptr, this->channel_count_, chunk_sample_count); chunk = out_sample_buffer; } /* TODO: Is this even needed if we have the processor? */ audio::apply_gain(chunk, this->channel_count_, chunk_sample_count, this->volume_); auto begin = std::chrono::system_clock::now(); for(const auto& consumer : this->consumers()) { consumer->process_data(out_sample_buffer, chunk_sample_count); } auto end = std::chrono::system_clock::now(); auto ms = std::chrono::duration_cast(end - begin).count(); if(ms > 5) { log_warn(category::audio, tr("Processing of audio input needed {}ms. This could be an issue!"), std::chrono::duration_cast(end - begin).count()); } } void AudioInput::EventLoopCallback::event_execute(const chrono::system_clock::time_point &point) { this->input->process_audio(); }