From 82333000d52af66d8b50ee8abfd23153063539ca Mon Sep 17 00:00:00 2001 From: WolverinDEV Date: Mon, 29 Mar 2021 11:36:37 +0200 Subject: [PATCH] Simplified the audio filters and reworked the audio sender logic --- native/serverconnection/CMakeLists.txt | 12 +- native/serverconnection/exports/exports.d.ts | 1 - .../serverconnection/src/audio/AudioInput.cpp | 44 ++- .../serverconnection/src/audio/AudioInput.h | 32 +- .../src/audio/AudioOutputSource.cpp | 7 +- .../src/audio/AudioReframer.cpp | 52 ++- .../src/audio/AudioReframer.h | 20 +- .../src/audio/codec/Converter.h | 136 ++++--- .../src/audio/codec/OpusConverter.cpp | 92 ++++- .../src/audio/codec/OpusConverter.h | 80 +++-- .../src/audio/filter/Filter.h | 30 +- .../src/audio/filter/FilterState.cpp | 14 +- .../src/audio/filter/FilterState.h | 16 +- .../src/audio/filter/FilterThreshold.cpp | 36 +- .../src/audio/filter/FilterThreshold.h | 37 +- .../src/audio/filter/FilterVad.cpp | 125 ++++--- .../src/audio/filter/FilterVad.h | 43 +-- .../src/audio/js/AudioConsumer.cpp | 246 ++++++------- .../src/audio/js/AudioConsumer.h | 51 ++- .../src/audio/js/AudioFilter.cpp | 13 +- .../src/audio/js/AudioFilter.h | 2 +- .../src/audio/js/AudioRecorder.cpp | 7 +- .../src/connection/ServerConnection.cpp | 4 +- .../src/connection/audio/AudioSender.cpp | 331 ++++++++++-------- .../src/connection/audio/AudioSender.h | 81 ++--- .../src/connection/audio/VoiceConnection.cpp | 16 +- 26 files changed, 854 insertions(+), 674 deletions(-) diff --git a/native/serverconnection/CMakeLists.txt b/native/serverconnection/CMakeLists.txt index 8efab94..1c7a9c6 100644 --- a/native/serverconnection/CMakeLists.txt +++ b/native/serverconnection/CMakeLists.txt @@ -129,9 +129,6 @@ endif () find_package(Soxr REQUIRED) include_directories(${Soxr_INCLUDE_DIR}) -find_package(fvad REQUIRED) -include_directories(${fvad_INCLUDE_DIR}) - find_package(Opus REQUIRED) find_package(spdlog REQUIRED) find_package(WebRTCAudioProcessing REQUIRED) @@ -147,7 +144,6 @@ set(REQUIRED_LIBRARIES DataPipes::core::static ${Soxr_LIBRARIES_STATIC} - ${fvad_LIBRARIES_STATIC} opus::static ${Ed25519_LIBRARIES_STATIC} @@ -158,6 +154,14 @@ set(REQUIRED_LIBRARIES Nan::Helpers ) +# We depricated FVad in favour of WebRTC audio processing and it's VAD detector +if(USE_FVAD) + find_package(fvad REQUIRED) + include_directories(${fvad_INCLUDE_DIR}) + list(APPEND REQUIRED_LIBRARIES ${fvad_LIBRARIES_STATIC}) + add_compile_definitions(USE_FVAD) +endif() + if (SOUNDIO_BACKED) list(APPEND REQUIRED_LIBRARIES soundio::static) else() diff --git a/native/serverconnection/exports/exports.d.ts b/native/serverconnection/exports/exports.d.ts index 4033ebf..93720f1 100644 --- a/native/serverconnection/exports/exports.d.ts +++ b/native/serverconnection/exports/exports.d.ts @@ -225,7 +225,6 @@ export namespace audio { export interface AudioConsumer { readonly sampleRate: number; readonly channelCount: number; - readonly frameSize: number; get_filters() : ConsumeFilter[]; unregister_filter(filter: ConsumeFilter); diff --git a/native/serverconnection/src/audio/AudioInput.cpp b/native/serverconnection/src/audio/AudioInput.cpp index 123fbc7..ecf5126 100644 --- a/native/serverconnection/src/audio/AudioInput.cpp +++ b/native/serverconnection/src/audio/AudioInput.cpp @@ -22,7 +22,7 @@ AudioConsumer::AudioConsumer(size_t channel_count, size_t sample_rate, size_t fr sample_rate{sample_rate}, frame_size{frame_size} { if(this->frame_size > 0) { - this->reframer = std::make_unique(channel_count, frame_size); + this->reframer = std::make_unique(channel_count, frame_size); this->reframer->on_frame = [&](const void* buffer) { this->handle_framed_data(buffer, this->frame_size); }; } } @@ -164,14 +164,14 @@ void AudioInput::stop() { this->input_recorder.reset(); } -std::vector> AudioInput::consumers() { - std::vector> result{}; +std::vector> AudioInput::consumers() { + std::vector> result{}; result.reserve(10); std::lock_guard consumer_lock{this->consumers_mutex}; result.reserve(this->consumers_.size()); - this->consumers_.erase(std::remove_if(this->consumers_.begin(), this->consumers_.end(), [&](const std::weak_ptr& weak_consumer) { + this->consumers_.erase(std::remove_if(this->consumers_.begin(), this->consumers_.end(), [&](const std::weak_ptr& weak_consumer) { auto consumer = weak_consumer.lock(); if(!consumer) { return true; @@ -184,13 +184,22 @@ std::vector> AudioInput::consumers() { return result; } -std::shared_ptr AudioInput::create_consumer(size_t frame_length) { - auto result = std::shared_ptr(new AudioConsumer(this->channel_count_, this->sample_rate_, frame_length)); - { - std::lock_guard lock(this->consumers_mutex); - this->consumers_.push_back(result); - } - return result; +void AudioInput::register_consumer(const std::shared_ptr& consumer) { + std::lock_guard lock{this->consumers_mutex}; + this->consumers_.push_back(consumer); +} + +void AudioInput::remove_consumer(const std::shared_ptr &target_consumer) { + std::lock_guard consumer_lock{this->consumers_mutex}; + + this->consumers_.erase(std::remove_if(this->consumers_.begin(), this->consumers_.end(), [&](const std::weak_ptr& weak_consumer) { + auto consumer = weak_consumer.lock(); + if(!consumer) { + return true; + } + + return consumer == target_consumer; + }), this->consumers_.end()); } std::shared_ptr AudioInput::create_level_meter(bool preprocess) { @@ -198,8 +207,8 @@ std::shared_ptr AudioInput::create_level_meter(bool p { std::lock_guard lock{this->consumers_mutex}; - auto& list = preprocess ? this->level_meter_preprocess : this->level_meter_postprocess; - list.push_back(level_meter); + auto& level_meters = preprocess ? this->level_meter_preprocess : this->level_meter_postprocess; + level_meters.push_back(level_meter); } return level_meter; @@ -295,6 +304,9 @@ void AudioInput::process_audio_chunk(float *chunk) { assert(memset(temp_sample_buffer, 0, sizeof(float) * kTempSampleBufferSize)); assert(memset(out_sample_buffer, 0, sizeof(float) * kTempSampleBufferSize)); + AudioInputBufferInfo buffer_info{}; + buffer_info.sample_count = chunk_sample_count; + this->invoke_level_meter(true, chunk, this->channel_count_, chunk_sample_count); if(auto processor{this->audio_processor_}; processor) { assert(kTempSampleBufferSize >= chunk_sample_count * this->channel_count_ * sizeof(float)); @@ -316,6 +328,10 @@ void AudioInput::process_audio_chunk(float *chunk) { audio::interleave_vec(out_sample_buffer, channel_ptr, this->channel_count_, chunk_sample_count); chunk = out_sample_buffer; + + if(process_statistics->voice_detected.has_value()) { + buffer_info.vad_detected.emplace(*process_statistics->voice_detected); + } } audio::apply_gain(chunk, this->channel_count_, chunk_sample_count, this->volume_); @@ -323,7 +339,7 @@ void AudioInput::process_audio_chunk(float *chunk) { auto begin = std::chrono::system_clock::now(); for(const auto& consumer : this->consumers()) { - consumer->process_data(out_sample_buffer, chunk_sample_count); + consumer->handle_buffer(buffer_info, out_sample_buffer); } auto end = std::chrono::system_clock::now(); diff --git a/native/serverconnection/src/audio/AudioInput.h b/native/serverconnection/src/audio/AudioInput.h index c235585..7059ba0 100644 --- a/native/serverconnection/src/audio/AudioInput.h +++ b/native/serverconnection/src/audio/AudioInput.h @@ -5,6 +5,7 @@ #include #include #include +#include #include "./AudioSamples.h" #include "./driver/AudioDriver.h" #include "../ring_buffer.h" @@ -12,7 +13,7 @@ namespace tc::audio { class AudioInput; - class InputReframer; + class AudioReframer; class AudioResampler; class AudioProcessor; class AudioInputAudioLevelMeter; @@ -30,12 +31,21 @@ namespace tc::audio { private: AudioConsumer(size_t channel_count, size_t sample_rate, size_t frame_size); - std::unique_ptr reframer; + std::unique_ptr reframer; void process_data(const void* /* buffer */, size_t /* samples */); void handle_framed_data(const void* /* buffer */, size_t /* samples */); }; + struct AudioInputBufferInfo { + size_t sample_count{0}; + std::optional vad_detected{}; + }; + + struct AudioInputConsumer { + virtual void handle_buffer(const AudioInputBufferInfo& /* info */, const float* /* buffer */) = 0; + }; + class AudioInput : public AudioDeviceRecord::Consumer { public: AudioInput(size_t /* channels */, size_t /* sample rate */); @@ -49,8 +59,10 @@ namespace tc::audio { [[nodiscard]] bool recording(); void stop(); - [[nodiscard]] std::vector> consumers(); - [[nodiscard]] std::shared_ptr create_consumer(size_t /* frame size */); + [[nodiscard]] std::vector> consumers(); + void register_consumer(const std::shared_ptr& /* consumer */); + void remove_consumer(const std::shared_ptr& /* consumer */); + [[nodiscard]] std::shared_ptr create_level_meter(bool /* pre process */); [[nodiscard]] inline auto audio_processor() { return this->audio_processor_; } @@ -84,25 +96,23 @@ namespace tc::audio { size_t const sample_rate_; std::mutex consumers_mutex{}; - std::deque> consumers_{}; + std::deque> consumers_{}; std::deque> level_meter_preprocess{}; std::deque> level_meter_postprocess{}; - std::recursive_mutex input_source_lock{}; + std::shared_ptr initialize_hook_handle{}; std::shared_ptr event_loop_entry{}; + std::shared_ptr audio_processor_{}; ring_buffer input_buffer; + std::recursive_mutex input_source_lock{}; + std::shared_ptr input_recorder{}; std::unique_ptr resampler_{nullptr}; std::shared_ptr input_device{}; float volume_{1.f}; - std::shared_ptr input_recorder{}; - std::shared_ptr audio_processor_{}; - - std::shared_ptr initialize_hook_handle{}; - void allocate_input_buffer_samples(size_t /* sample count */); [[nodiscard]] inline auto chunk_sample_count() const { return (kChunkSizeMs * this->sample_rate_) / 1000; } diff --git a/native/serverconnection/src/audio/AudioOutputSource.cpp b/native/serverconnection/src/audio/AudioOutputSource.cpp index ec4bb71..1f67f97 100644 --- a/native/serverconnection/src/audio/AudioOutputSource.cpp +++ b/native/serverconnection/src/audio/AudioOutputSource.cpp @@ -55,7 +55,6 @@ void AudioOutputSource::apply_fadein() { for(size_t index{0}; index < fade_samples; index++) { const auto offset = (float) ((float) (index + 1) / (float) fade_samples); const auto volume = std::min(log10f(1 - offset) / -2.71828182845904f, 1.f); - for(int channel{0}; channel < this->channel_count_; channel++) { *write_ptr++ *= volume; } @@ -102,7 +101,7 @@ bool AudioOutputSource::pop_samples_(void *target_buffer, size_t target_sample_c if(auto callback{this->on_underflow}; callback) { if(callback(missing_samples)) { /* We've been filled up again. Trying again to fill the output buffer. */ - return this->pop_samples(target_buffer, target_sample_count); + return this->pop_samples_(target_buffer, target_sample_count); } } @@ -126,7 +125,7 @@ bool AudioOutputSource::pop_samples_(void *target_buffer, size_t target_sample_c this->buffer_state = BufferState::fadeout; if(write_samples < target_sample_count) { /* Fill the rest of the buffer with the fadeout content */ - this->pop_samples((char*) target_buffer + write_byte_size, target_sample_count - write_samples); + this->pop_samples_((char*) target_buffer + write_byte_size, target_sample_count - write_samples); } } else { /* We can just normally copy the buffer */ @@ -184,7 +183,7 @@ ssize_t AudioOutputSource::enqueue_samples_(const void *source_buffer, size_t sa this->buffer_state = BufferState::playing; if(sample_count > missing_samples) { /* we've more data to write */ - return this->enqueue_samples((const char*) source_buffer + write_byte_size, sample_count - missing_samples) + write_sample_count; + return this->enqueue_samples_((const char*) source_buffer + write_byte_size, sample_count - missing_samples) + write_sample_count; } else { return write_sample_count; } diff --git a/native/serverconnection/src/audio/AudioReframer.cpp b/native/serverconnection/src/audio/AudioReframer.cpp index 85823eb..e8dc086 100644 --- a/native/serverconnection/src/audio/AudioReframer.cpp +++ b/native/serverconnection/src/audio/AudioReframer.cpp @@ -4,52 +4,66 @@ using namespace tc::audio; -InputReframer::InputReframer(size_t channels, size_t frame_size) : _frame_size(frame_size), _channels(channels) { +AudioReframer::AudioReframer(size_t channels, size_t target_frame_size) : frame_size_(target_frame_size), channels_(channels) { this->buffer = nullptr; - this->_buffer_index = 0; + this->buffer_index_ = 0; } -InputReframer::~InputReframer() { +AudioReframer::~AudioReframer() { if(this->buffer) { free(this->buffer); } } -void InputReframer::process(const void *source, size_t samples) { +void AudioReframer::process(const void *source, size_t samples) { if(!this->buffer) { - this->buffer = (float*) malloc(this->_channels * this->_frame_size * sizeof(float)); + this->buffer = (float*) malloc(this->channels_ * this->frame_size_ * sizeof(float)); } assert(this->on_frame); - if(this->_buffer_index > 0) { - if(this->_buffer_index + samples > this->_frame_size) { - auto required = this->_frame_size - this->_buffer_index; - auto length = required * this->_channels * sizeof(float); + if(this->buffer_index_ > 0) { + if(this->buffer_index_ + samples > this->frame_size_) { + auto required = this->frame_size_ - this->buffer_index_; + auto length = required * this->channels_ * sizeof(float); - memcpy((char*) this->buffer + this->_buffer_index * sizeof(float) * this->_channels, source, length); + memcpy((char*) this->buffer + this->buffer_index_ * sizeof(float) * this->channels_, source, length); samples -= required; source = (char*) source + length; - this->on_frame(this->buffer); + this->on_frame((float*) this->buffer); } else { - memcpy((char*) this->buffer + this->_buffer_index * sizeof(float) * this->_channels, source, samples * this->_channels * sizeof(float)); - this->_buffer_index += samples; + memcpy((char*) this->buffer + this->buffer_index_ * sizeof(float) * this->channels_, source, samples * this->channels_ * sizeof(float)); + this->buffer_index_ += samples; return; } } auto _on_frame = this->on_frame; - while(samples > this->_frame_size) { + while(samples > this->frame_size_) { if(_on_frame) { - _on_frame(source); + _on_frame((float*) source); } - samples -= this->_frame_size; - source = (char*) source + this->_frame_size * this->_channels * sizeof(float); + samples -= this->frame_size_; + source = (char*) source + this->frame_size_ * this->channels_ * sizeof(float); } if(samples > 0) { - memcpy((char*) this->buffer, source, samples * this->_channels * sizeof(float)); + memcpy((char*) this->buffer, source, samples * this->channels_ * sizeof(float)); } - this->_buffer_index = samples; + this->buffer_index_ = samples; +} + +void AudioReframer::flush() { + if(this->buffer_index_ == 0) { + return; + } + + if(!this->on_flush) { + this->buffer_index_ = 0; + return; + } + + this->on_flush((const float*) this->buffer, this->buffer_index_); + this->buffer_index_ = 0; } \ No newline at end of file diff --git a/native/serverconnection/src/audio/AudioReframer.h b/native/serverconnection/src/audio/AudioReframer.h index aa50898..c26d4bf 100644 --- a/native/serverconnection/src/audio/AudioReframer.h +++ b/native/serverconnection/src/audio/AudioReframer.h @@ -4,22 +4,24 @@ #include namespace tc::audio { - class InputReframer { + class AudioReframer { public: - InputReframer(size_t channels, size_t frame_size); - virtual ~InputReframer(); + AudioReframer(size_t channels, size_t target_frame_size); + virtual ~AudioReframer(); void process(const void* /* source */, size_t /* samples */); + void flush(); - [[nodiscard]] inline size_t channels() const { return this->_channels; } - [[nodiscard]] inline size_t frame_size() const { return this->_frame_size; } + [[nodiscard]] inline size_t channels() const { return this->channels_; } + [[nodiscard]] inline size_t target_size() const { return this->frame_size_; } - std::function on_frame; + std::function on_frame; + std::function on_flush; private: void* buffer; - size_t _buffer_index; + size_t buffer_index_; - size_t _channels; - size_t _frame_size; + size_t channels_; + size_t frame_size_; }; } \ No newline at end of file diff --git a/native/serverconnection/src/audio/codec/Converter.h b/native/serverconnection/src/audio/codec/Converter.h index 5527dd2..dc1e1ea 100644 --- a/native/serverconnection/src/audio/codec/Converter.h +++ b/native/serverconnection/src/audio/codec/Converter.h @@ -6,63 +6,103 @@ #define ssize_t int64_t #endif -namespace tc { - namespace audio { - namespace codec { - namespace type { - enum value { - undefined, +namespace tc::audio::codec { + namespace type { + enum value { + undefined, - /* supported */ - opus, - speex, + /* supported */ + opus, + speex, - /* unsupported */ - flac, - celt - }; + /* unsupported */ + flac, + celt + }; - extern bool supported(value); - } + extern bool supported(value); + } - class Converter { - public: - Converter(size_t /* channels */, size_t /* sample rate */, size_t /* frame size */); - virtual ~Converter(); + class Converter { + public: + Converter(size_t /* channels */, size_t /* sample rate */, size_t /* frame size */); + virtual ~Converter(); - /* initialize parameters depend on the codec */ - virtual bool valid() = 0; - virtual void finalize() = 0; + /* initialize parameters depend on the codec */ + virtual bool valid() = 0; + virtual void finalize() = 0; - virtual void reset_encoder() = 0; - virtual void reset_decoder() = 0; + virtual void reset_encoder() = 0; + virtual void reset_decoder() = 0; - /** - * @return number of bytes written on success - */ - virtual ssize_t encode(std::string& /* error */, const void* /* source */, void* /* destination */, size_t /* destination byte length */, bool /* head package */) = 0; + /** + * @return number of bytes written on success + */ + virtual ssize_t encode(std::string& /* error */, const void* /* source */, void* /* destination */, size_t /* destination byte length */, bool /* head package */) = 0; - /** - * @return number of samples on success - */ - virtual ssize_t decode(std::string& /* error */, const void* /* source */, size_t /* source byte length */, void* /* destination */, bool /* fec decoding */) = 0; - virtual ssize_t decode_lost(std::string& /* error */, size_t /* packets */) = 0; + /** + * @return number of samples on success + */ + virtual ssize_t decode(std::string& /* error */, const void* /* source */, size_t /* source byte length */, void* /* destination */, bool /* fec decoding */) = 0; + virtual ssize_t decode_lost(std::string& /* error */, size_t /* packets */) = 0; - virtual size_t expected_encoded_length(size_t /* sample count */) = 0; - virtual size_t expected_decoded_length(const void* /* source */, size_t /* source byte length */) { - return this->bytes_per_frame(); - } + virtual size_t expected_encoded_length(size_t /* sample count */) = 0; + virtual size_t expected_decoded_length(const void* /* source */, size_t /* source byte length */) { + return this->bytes_per_frame(); + } - inline size_t channels() { return this->_channels; } - inline size_t sample_rate() { return this->_sample_rate; } - inline size_t frame_size() { return this->_frame_size; } + inline size_t channels() { return this->_channels; } + inline size_t sample_rate() { return this->_sample_rate; } + inline size_t frame_size() { return this->_frame_size; } - inline size_t bytes_per_frame() { return this->_channels * this->_frame_size * 4; } - protected: - size_t _frame_size; - size_t _channels; - size_t _sample_rate; - }; - } - } + inline size_t bytes_per_frame() { return this->_channels * this->_frame_size * 4; } + protected: + size_t _frame_size; + size_t _channels; + size_t _sample_rate; + }; + + struct EncoderBufferInfo { + size_t sample_count{0}; + bool head_sequence{false}; + bool flush_encoder{false}; + }; + + /** + * Encoders should only be accessed by one thread at once + */ + class AudioEncoder { + public: + explicit AudioEncoder() = default; + virtual ~AudioEncoder() = default; + + [[nodiscard]] virtual bool valid() const = 0; + [[nodiscard]] virtual bool initialize(std::string& /* error */) = 0; + virtual void reset_sequence() = 0; + + /** + * Get the codecs sample rate. + */ + [[nodiscard]] virtual size_t sample_rate() const = 0; + + /** + * Get the codecs audio channel count. + */ + [[nodiscard]] virtual size_t channel_count() const = 0; + + + /** + * @returns the expected output length. + * If unknown a size near the MTU will be returned. + */ + [[nodiscard]] virtual size_t expected_encoded_length(const float* /* samples */, size_t /* sample count */) const = 0; + + /** + * Encode a chunk of audio data. + * @return `true` on success else `false` + */ + [[nodiscard]] virtual bool encode(std::string& /* error */, void* /* target buffer */, size_t& /* target length */, const EncoderBufferInfo& /* buffer info */, const float* /* samples */) = 0; + }; + + class AudioDecoder {}; } \ No newline at end of file diff --git a/native/serverconnection/src/audio/codec/OpusConverter.cpp b/native/serverconnection/src/audio/codec/OpusConverter.cpp index ec5fc26..74b768a 100644 --- a/native/serverconnection/src/audio/codec/OpusConverter.cpp +++ b/native/serverconnection/src/audio/codec/OpusConverter.cpp @@ -157,4 +157,94 @@ bool OpusConverter::_finalize_encoder(std::string &) { this->encoder = nullptr; } return true; -} \ No newline at end of file +} + +OpusAudioEncoder::OpusAudioEncoder(int application_type) : application_type_{application_type} {} +OpusAudioEncoder::~OpusAudioEncoder() noexcept { + if(this->encoder) { + opus_encoder_destroy(this->encoder); + this->encoder = nullptr; + } +}; + +bool OpusAudioEncoder::valid() const { + return this->encoder != nullptr; +} + +bool OpusAudioEncoder::initialize(string &error) { + int error_id = 0; + this->encoder = opus_encoder_create((opus_int32) this->sample_rate(), (int) this->channel_count(), this->application_type_, &error_id); + if(!this->encoder || error_id) { + error = "failed to create encoder (" + to_string(error_id) + ")"; + goto cleanup_error; + } + + error_id = opus_encoder_ctl(this->encoder, OPUS_SET_BITRATE(64000)); + if(error_id) { + error = "failed to set bitrate (" + to_string(error_id) + ")"; + goto cleanup_error; + } + + error_id = opus_encoder_ctl(this->encoder, OPUS_SET_INBAND_FEC(1)); + if(error_id) { + error = "failed to enable fec (" + to_string(error_id) + ")"; + goto cleanup_error; + } + + error_id = opus_encoder_ctl(this->encoder, OPUS_SET_PACKET_LOSS_PERC(15)); + if(error_id) { + error = "failed to assume a 15% packet loss (" + to_string(error_id) + ")"; + goto cleanup_error; + } + + return true; + + cleanup_error: + if(this->encoder) { + opus_encoder_destroy(this->encoder); + this->encoder = nullptr; + } + return false; +} + +void OpusAudioEncoder::reset_sequence() { + auto result = opus_encoder_ctl(this->encoder, OPUS_RESET_STATE); + if(result != OPUS_OK) { + log_warn(category::audio, tr("Failed to reset opus encoder. Opus result: {}"), result); + } +} + +size_t OpusAudioEncoder::sample_rate() const { + return 48000; +} + +size_t OpusAudioEncoder::channel_count() const { + if(this->application_type() == OPUS_APPLICATION_AUDIO) { + return 2; + } else { + return 1; + } +} + +size_t OpusAudioEncoder::expected_encoded_length(const float *, size_t) const { + return 1500; +} + +bool OpusAudioEncoder::encode(std::string& error, void *target_buffer, size_t &target_size, const EncoderBufferInfo &info, const float *source_buffer) { + if(info.sample_count == 0) { + /* flush request but we've no internal buffers */ + target_size = 0; + return true; + } + + /* TODO: Use some calculated variables here provided via EncoderBufferInfo */ + opus_encoder_ctl(encoder, OPUS_SET_PACKET_LOSS_PERC(info.head_sequence ? 100 : 15)); + auto result = opus_encode_float(this->encoder, source_buffer, (int) info.sample_count, (uint8_t*) target_buffer, (opus_int32) target_size); + if(result < OPUS_OK) { + error = to_string(result) + "|" + opus_strerror(result); + return false; + } + + target_size = (size_t) result; + return true; +} diff --git a/native/serverconnection/src/audio/codec/OpusConverter.h b/native/serverconnection/src/audio/codec/OpusConverter.h index 1e3d17d..ce37928 100644 --- a/native/serverconnection/src/audio/codec/OpusConverter.h +++ b/native/serverconnection/src/audio/codec/OpusConverter.h @@ -4,41 +4,63 @@ #include #include -namespace tc { - namespace audio { - namespace codec { - class OpusConverter : public Converter { - public: - OpusConverter(size_t /* channels */, size_t /* sample rate */, size_t /* frame size */); - virtual ~OpusConverter(); +namespace tc::audio::codec { + class OpusConverter : public Converter { + public: + OpusConverter(size_t /* channels */, size_t /* sample rate */, size_t /* frame size */); + virtual ~OpusConverter(); - bool valid() override; + bool valid() override; - bool initialize(std::string& /* error */, int /* application type */); - void finalize() override; + bool initialize(std::string& /* error */, int /* application type */); + void finalize() override; - void reset_encoder() override; - void reset_decoder() override; + void reset_encoder() override; + void reset_decoder() override; - ssize_t encode(std::string & /* error */, const void * /* source */, void * /* target */, size_t /* target size */, bool /* head package */) override; - ssize_t decode(std::string & /* error */, const void * /* source */, size_t /* source size */, void *pVoid1, bool /* use fec */) override; + ssize_t encode(std::string & /* error */, const void * /* source */, void * /* target */, size_t /* target size */, bool /* head package */) override; + ssize_t decode(std::string & /* error */, const void * /* source */, size_t /* source size */, void *pVoid1, bool /* use fec */) override; - ssize_t decode_lost(std::string &string, size_t /* packets */) override; + ssize_t decode_lost(std::string &string, size_t /* packets */) override; - size_t expected_encoded_length(size_t size) override; - private: - std::mutex coder_lock; - OpusDecoder* decoder = nullptr; - OpusEncoder* encoder = nullptr; + size_t expected_encoded_length(size_t size) override; + private: + std::mutex coder_lock; + OpusDecoder* decoder = nullptr; + OpusEncoder* encoder = nullptr; - bool fec_decoder_{false}; - int _application_type = 0; + bool fec_decoder_{false}; + int _application_type = 0; - bool _finalize_encoder(std::string& /* error */); - bool _finalize_decoder(std::string& /* error */); - bool _initialize_encoder(std::string& /* error */); - bool _initialize_decoder(std::string& /* error */); - }; - } - } + bool _finalize_encoder(std::string& /* error */); + bool _finalize_decoder(std::string& /* error */); + bool _initialize_encoder(std::string& /* error */); + bool _initialize_decoder(std::string& /* error */); + }; + + class OpusAudioEncoder : public AudioEncoder { + public: + explicit OpusAudioEncoder(int /* application type */); + ~OpusAudioEncoder() override; + + bool valid() const override; + + bool initialize(std::string &string) override; + + void reset_sequence() override; + + size_t sample_rate() const override; + + size_t channel_count() const override; + + size_t expected_encoded_length(const float *pDouble, size_t size) const override; + + bool encode(std::string&, void *, size_t &, const EncoderBufferInfo &, const float *) override; + + [[nodiscard]] inline auto application_type() const { return this->application_type_; } + + private: + int application_type_; + OpusEncoder* encoder{nullptr}; + }; } \ No newline at end of file diff --git a/native/serverconnection/src/audio/filter/Filter.h b/native/serverconnection/src/audio/filter/Filter.h index 7770ffe..d0fc3d2 100644 --- a/native/serverconnection/src/audio/filter/Filter.h +++ b/native/serverconnection/src/audio/filter/Filter.h @@ -3,25 +3,21 @@ #include #include -namespace tc { - namespace audio { - namespace filter { - class Filter { - public: - Filter(size_t channel_count, size_t sample_rate, size_t frame_size) : - _frame_size(frame_size), _sample_rate(sample_rate), _channels(channel_count) {} +namespace tc::audio { + struct AudioInputBufferInfo; - virtual bool process(const void* /* buffer */) = 0; + namespace filter { + class Filter { + public: + Filter(size_t channel_count, size_t sample_rate) : sample_rate_{sample_rate}, channels_{channel_count} {} - inline size_t sample_rate() { return this->_sample_rate; } - inline size_t channels() { return this->_channels; } - inline size_t frame_size() { return this->_frame_size; } + virtual bool process(const AudioInputBufferInfo& /* info */, const float* /* buffer */) = 0; - protected: - size_t _frame_size; - size_t _sample_rate; - size_t _channels; - }; - } + inline size_t sample_rate() { return this->sample_rate_; } + inline size_t channels() { return this->channels_; } + protected: + size_t sample_rate_; + size_t channels_; + }; } } \ No newline at end of file diff --git a/native/serverconnection/src/audio/filter/FilterState.cpp b/native/serverconnection/src/audio/filter/FilterState.cpp index ae54a83..16de34f 100644 --- a/native/serverconnection/src/audio/filter/FilterState.cpp +++ b/native/serverconnection/src/audio/filter/FilterState.cpp @@ -1,17 +1,11 @@ -#include "FilterState.h" -#include "Filter.h" +#include "./FilterState.h" using namespace std; using namespace tc::audio; using namespace tc::audio::filter; -StateFilter::StateFilter(size_t a, size_t b, size_t c) : Filter(a, b, c) {} -StateFilter::~StateFilter() {} +StateFilter::StateFilter(size_t a, size_t b) : Filter{a, b} {} -bool StateFilter::initialize(std::string &) { - return true; -} - -bool StateFilter::process(const void *_buffer) { - return !this->_consume; +bool StateFilter::process(const AudioInputBufferInfo &, const float *) { + return !this->consume_; } \ No newline at end of file diff --git a/native/serverconnection/src/audio/filter/FilterState.h b/native/serverconnection/src/audio/filter/FilterState.h index b5501cd..260411f 100644 --- a/native/serverconnection/src/audio/filter/FilterState.h +++ b/native/serverconnection/src/audio/filter/FilterState.h @@ -1,24 +1,20 @@ #pragma once -#include "Filter.h" +#include "./Filter.h" #include -#include namespace tc { namespace audio { namespace filter { class StateFilter : public Filter { public: - StateFilter(size_t /* channel count */, size_t /* sample rate */, size_t /* frame size */); - virtual ~StateFilter(); + StateFilter(size_t /* channel count */, size_t /* sample rate */); + bool process(const AudioInputBufferInfo& /* info */, const float* /* buffer */) override; - bool initialize(std::string& /* error */); - bool process(const void* /* buffer */) override; - - inline bool consumes_input() { return this->_consume; } - inline void set_consume_input(bool state) { this->_consume = state; } + [[nodiscard]] inline auto consumes_input() const { return this->consume_; } + inline void set_consume_input(bool state) { this->consume_ = state; } private: - bool _consume = false; + bool consume_{false}; }; } } diff --git a/native/serverconnection/src/audio/filter/FilterThreshold.cpp b/native/serverconnection/src/audio/filter/FilterThreshold.cpp index a7cb891..8122045 100644 --- a/native/serverconnection/src/audio/filter/FilterThreshold.cpp +++ b/native/serverconnection/src/audio/filter/FilterThreshold.cpp @@ -1,46 +1,44 @@ -#include -#include #include -#include "FilterThreshold.h" +#include "./FilterThreshold.h" +#include "../AudioInput.h" #include "../processing/AudioVolume.h" using namespace std; using namespace tc::audio; using namespace tc::audio::filter; -ThresholdFilter::ThresholdFilter(size_t a, size_t b, size_t c) : Filter(a, b, c) {} +ThresholdFilter::ThresholdFilter(size_t a, size_t b) : Filter(a, b) {} ThresholdFilter::~ThresholdFilter() = default; -bool ThresholdFilter::initialize(std::string &, float val, size_t margin) { - this->_threshold = val; - this->_margin_samples = margin; - return true; +void ThresholdFilter::initialize(float threshold, size_t margin) { + this->threshold_ = threshold; + this->margin_samples_ = margin; } -bool ThresholdFilter::process(const void *_buffer) { +bool ThresholdFilter::process(const AudioInputBufferInfo& info, const float* buffer) { auto analyze_callback = this->on_analyze; - float value = audio::audio_buffer_level((const float*) _buffer, this->_channels, this->_frame_size); + float value = audio::audio_buffer_level(buffer, this->channels_, info.sample_count); - auto last_level = this->_current_level; + auto last_level = this->current_level_; float smooth; - if(this->_margin_processed_samples == 0) { + if(this->margin_processed_samples_ == 0) { /* we're in release */ - smooth = this->_release_smooth; + smooth = this->release_smooth_; } else { - smooth = this->_attack_smooth; + smooth = this->attack_smooth_; } - this->_current_level = last_level * smooth + value * (1 - smooth); + this->current_level_ = last_level * smooth + value * (1 - smooth); //log_trace(category::audio, "Vad level: before: {}, edit: {}, now: {}, smooth: {}", last_level, value, this->_current_level, smooth); if(analyze_callback) { - analyze_callback(this->_current_level); + analyze_callback(this->current_level_); } - if(this->_current_level >= this->_threshold) { - this->_margin_processed_samples = 0; + if(this->current_level_ >= this->threshold_) { + this->margin_processed_samples_ = 0; return true; } - return (this->_margin_processed_samples += this->_frame_size) < this->_margin_samples; + return (this->margin_processed_samples_ += info.sample_count) < this->margin_samples_; } \ No newline at end of file diff --git a/native/serverconnection/src/audio/filter/FilterThreshold.h b/native/serverconnection/src/audio/filter/FilterThreshold.h index 5e19e01..57b6762 100644 --- a/native/serverconnection/src/audio/filter/FilterThreshold.h +++ b/native/serverconnection/src/audio/filter/FilterThreshold.h @@ -1,41 +1,40 @@ #pragma once -#include "Filter.h" +#include "./Filter.h" #include -#include #include namespace tc::audio::filter { class ThresholdFilter : public Filter { public: - ThresholdFilter(size_t /* channel count */, size_t /* sample rate */, size_t /* frame size */); + ThresholdFilter(size_t /* channel count */, size_t /* sample rate */); virtual ~ThresholdFilter(); - bool initialize(std::string& /* error */, float /* threshold */, size_t /* margin frames */); - bool process(const void* /* buffer */) override; + void initialize(float /* threshold */, size_t /* margin frames */); + bool process(const AudioInputBufferInfo& /* info */, const float* /* buffer */) override; - inline void set_threshold(float value) { this->_threshold = value; } - [[nodiscard]] inline float threshold() const { return this->_threshold; } + [[nodiscard]] inline float threshold() const { return this->threshold_; } + inline void set_threshold(float value) { this->threshold_ = value; } /* in seconds */ - [[nodiscard]] inline float margin_release_time() { return (float) this->_margin_samples / (float) this->_sample_rate; } - inline void set_margin_release_time(float value) { this->_margin_samples = (size_t) ceil((float) this->_sample_rate * value); } + [[nodiscard]] inline float margin_release_time() { return (float) this->margin_samples_ / (float) this->sample_rate_; } + inline void set_margin_release_time(float value) { this->margin_samples_ = (size_t) ceil((float) this->sample_rate_ * value); } - [[nodiscard]] inline float attack_smooth() const { return this->_attack_smooth; } - inline void attack_smooth(float value) { this->_attack_smooth = value; } + [[nodiscard]] inline float attack_smooth() const { return this->attack_smooth_; } + inline void attack_smooth(float value) { this->attack_smooth_ = value; } - [[nodiscard]] inline float release_smooth() const { return this->_release_smooth; } - inline void release_smooth(float value) { this->_release_smooth = value; } + [[nodiscard]] inline float release_smooth() const { return this->release_smooth_; } + inline void release_smooth(float value) { this->release_smooth_ = value; } std::function on_analyze; private: - float _attack_smooth = 0; - float _release_smooth = 0; - float _current_level = 0; + float attack_smooth_{0}; + float release_smooth_{0}; + float current_level_{0}; - float _threshold{}; + float threshold_{0}; - size_t _margin_samples = 0; - size_t _margin_processed_samples = 0; + size_t margin_samples_{0}; + size_t margin_processed_samples_{0}; }; } \ No newline at end of file diff --git a/native/serverconnection/src/audio/filter/FilterVad.cpp b/native/serverconnection/src/audio/filter/FilterVad.cpp index 5211b65..80d69fa 100644 --- a/native/serverconnection/src/audio/filter/FilterVad.cpp +++ b/native/serverconnection/src/audio/filter/FilterVad.cpp @@ -1,107 +1,134 @@ -#include "FilterVad.h" +#include "./FilterVad.h" #include "../AudioMerger.h" +#include "../AudioInput.h" + +#ifdef USE_FVAD #include "../../logger.h" +#include +#endif using namespace std; using namespace tc::audio; using namespace tc::audio::filter; -VadFilter::VadFilter(size_t channels, size_t rate, size_t frames) : Filter(channels, rate, frames) { } +VadFilter::VadFilter(size_t channels, size_t rate) : Filter{channels, rate} { } +#ifdef USE_FVAD VadFilter::~VadFilter() { - this->cleanup_buffer(); - if(this->_vad_handle) { - fvad_free(this->_vad_handle); - this->_vad_handle = nullptr; + if(this->vad_handle_) { + fvad_free(this->vad_handle_); + this->vad_handle_ = nullptr; } } -void VadFilter::cleanup_buffer() { - lock_guard lock(this->_buffer_lock); - if(this->_buffer) - free(this->_buffer); - this->_buffer = nullptr; - this->_buffer_size = 0; -} - bool VadFilter::initialize(std::string &error, size_t mode, size_t margin) { - - this->_vad_handle = fvad_new(); - if(!this->_vad_handle) { + this->vad_handle_ = fvad_new(); + if(!this->vad_handle_) { error = "failed to allocate handle"; return false; } - if(fvad_set_sample_rate(this->_vad_handle, (int) this->_sample_rate) != 0) { + if(fvad_set_sample_rate(this->vad_handle_, (int) this->sample_rate_) != 0) { error = "invalid sample rate. Sample rate must be one of [8000, 16000, 32000 and 48000]"; return false; } - if(fvad_set_mode(this->_vad_handle, (int) mode) != 0) { + if(fvad_set_mode(this->vad_handle_, (int) mode) != 0) { error = "failed to set mode"; return false; } - this->_mode = mode; - this->_margin_samples = margin; - if(this->_channels > 1) { - this->ensure_buffer(this->_frame_size * this->_channels * 4); /* buffer to merge the channels into one channel */ - } else { - this->ensure_buffer(this->_frame_size * 2); - } - + this->mode_ = mode; + this->margin_samples_ = margin; return true; } -bool VadFilter::process(const void *buffer) { - if(!this->_vad_handle) { +std::optional VadFilter::mode() const { + return std::make_optional(this->mode_); +} + +constexpr static auto kMaxStackBufferSamples{1024 * 8}; +bool VadFilter::contains_voice(const AudioInputBufferInfo& info, const float* buffer_) { + if(!this->vad_handle_) { log_warn(category::audio, tr("Vad filter hasn't been initialized!")); return false; } - lock_guard lock(this->_buffer_lock); - if(this->_channels > 1) { - if(!merge::merge_channels_interleaved(this->_buffer, 1, buffer, this->_channels, this->_frame_size)) { + float temp_sample_buffer[kMaxStackBufferSamples]; + if(info.sample_count > kMaxStackBufferSamples) { + log_warn(category::audio, tr("Vad filter received too many samples {}, expected max {}."), info.sample_count, kMaxStackBufferSamples); + return false; + } + + if(this->channels_ > 1) { + if(!merge::merge_channels_interleaved(temp_sample_buffer, 1, buffer_, this->channels_, info.sample_count)) { log_warn(category::audio, tr("Failed to merge channels")); return false; } - buffer = this->_buffer; + + buffer_ = temp_sample_buffer; } /* convert float32 samples to signed int16 */ { - auto target = (int16_t*) this->_buffer; - auto source = (float*) buffer; - auto sample = this->_frame_size; + auto target = (int16_t*) temp_sample_buffer; + auto source = buffer_; + auto sample = info.sample_count; float tmp; while(sample-- > 0) { tmp = *source++; tmp *= 32768; - if(tmp > 32767) - tmp = 32767; + if(tmp > 32767) { + tmp = 32767; + } - if(tmp < -32768) - tmp = -32768; + if(tmp < -32768) { + tmp = -32768; + } *target++ = (int16_t) tmp; } } - auto result = fvad_process(this->_vad_handle, (int16_t*) this->_buffer, this->_frame_size); + auto result = fvad_process(this->vad_handle_, (int16_t*) temp_sample_buffer, info.sample_count); if(result == -1) { log_warn(category::audio, tr("Invalid frame length")); return false; } - auto flag_vad = result == 1; + return result == 1; +} +#else +VadFilter::~VadFilter() = default; - if(!flag_vad) { - this->_margin_processed_samples += this->_frame_size; - return this->_margin_processed_samples <= this->_margin_samples; - } else { - this->_margin_processed_samples = 0; - } - return flag_vad; +std::optional VadFilter::mode() const { + (void) this; + return std::nullopt; +} + +bool VadFilter::initialize(std::string &error, size_t mode, size_t margin) { + this->mode_ = mode; + this->margin_samples_ = margin; + return true; +} + +bool VadFilter::contains_voice(const AudioInputBufferInfo &info, const float *) { + (void) this; + return info.vad_detected.value_or(true); +} + +#endif + + +bool VadFilter::process(const AudioInputBufferInfo &info, const float *buffer) { + auto flag_vad{this->contains_voice(info, buffer)}; + if(!flag_vad) { + this->margin_processed_samples_ += info.sample_count; + return this->margin_processed_samples_ <= this->margin_samples_; + } else { + this->margin_processed_samples_ = 0; + } + return flag_vad; } \ No newline at end of file diff --git a/native/serverconnection/src/audio/filter/FilterVad.h b/native/serverconnection/src/audio/filter/FilterVad.h index eceb10e..f6c2899 100644 --- a/native/serverconnection/src/audio/filter/FilterVad.h +++ b/native/serverconnection/src/audio/filter/FilterVad.h @@ -1,43 +1,36 @@ #pragma once -#include "Filter.h" +#include "./Filter.h" #include #include -#include +#include + +#ifdef USE_FVAD +struct Fvad; +#endif namespace tc::audio::filter { class VadFilter : public Filter { public: - VadFilter(size_t /* channel count */, size_t /* sample rate */, size_t /* frame size */); + VadFilter(size_t /* channel count */, size_t /* sample rate */); virtual ~VadFilter(); bool initialize(std::string& /* error */, size_t /* mode */, size_t /* margin frames */); - bool process(const void* /* buffer */) override; + bool process(const AudioInputBufferInfo& /* info */, const float* /* buffer */) override; - inline float margin_release_time() { return (float) this->_margin_samples / (float) this->_sample_rate; } - inline void set_margin_release_time(float value) { this->_margin_samples = (size_t) ceil((float) this->_sample_rate * value); } + inline float margin_release_time() { return (float) this->margin_samples_ / (float) this->sample_rate_; } + inline void set_margin_release_time(float value) { this->margin_samples_ = (size_t) ceil((float) this->sample_rate_ * value); } - inline size_t mode() { return this->_mode; } + [[nodiscard]] std::optional mode() const; private: - Fvad* _vad_handle = nullptr; + size_t mode_{0}; + size_t margin_samples_{0}; + size_t margin_processed_samples_{0}; - size_t _mode = 0; - size_t _margin_samples = 0; - size_t _margin_processed_samples = 0; +#ifdef USE_FVAD + Fvad* vad_handle_{nullptr}; +#endif - std::mutex _buffer_lock; - void* _buffer = nullptr; - size_t _buffer_size = 0; - - void cleanup_buffer(); - inline void ensure_buffer(size_t length) { - if(this->_buffer_size < length) { - if(this->_buffer) - free(this->_buffer); - - this->_buffer_size = length; - this->_buffer = malloc(this->_buffer_size); - } - } + bool contains_voice(const AudioInputBufferInfo& /* info */, const float* /* buffer */); }; } \ No newline at end of file diff --git a/native/serverconnection/src/audio/js/AudioConsumer.cpp b/native/serverconnection/src/audio/js/AudioConsumer.cpp index 325e170..4eb52ca 100644 --- a/native/serverconnection/src/audio/js/AudioConsumer.cpp +++ b/native/serverconnection/src/audio/js/AudioConsumer.cpp @@ -1,13 +1,11 @@ #include "./AudioConsumer.h" #include "./AudioRecorder.h" #include "./AudioFilter.h" -#include "../AudioInput.h" #include "../filter/Filter.h" #include "../filter/FilterVad.h" #include "../filter/FilterThreshold.h" #include "../filter/FilterState.h" #include "../../logger.h" -#include /* Must be last */ using namespace std; using namespace tc::audio; @@ -41,42 +39,33 @@ NAN_METHOD(AudioConsumerWrapper::NewInstance) { Nan::ThrowError("invalid invoke!"); } -AudioConsumerWrapper::AudioConsumerWrapper(AudioRecorderWrapper* h, const std::shared_ptr &handle) : _handle(handle), _recorder(h) { - log_allocate("AudioConsumerWrapper", this); - { - lock_guard read_lock{handle->on_read_lock}; - handle->on_read = [&](const void* buffer, size_t length){ this->process_data(buffer, length); }; - } +AudioConsumerWrapper::AudioConsumerWrapper(const std::shared_ptr &input) : + sample_rate_{input->sample_rate()}, + channel_count_{input->channel_count()}, + js_queue{} +{ + log_allocate("AudioConsumerWrapper", this); -#ifdef DO_DEADLOCK_REF - this->_recorder->js_ref(); /* FML Mem leak! (In general the consumer live is related to the recorder handle, but for nodejs testing we want to keep this reference ) */ -#endif + this->consumer_handle = std::make_shared(); + this->consumer_handle->wrapper = this; + + input->register_consumer(this->consumer_handle); } AudioConsumerWrapper::~AudioConsumerWrapper() { log_free("AudioConsumerWrapper", this); - lock_guard lock{this->execute_mutex}; - this->unbind(); - - for(auto index{0}; index < kInternalFrameBufferCount; index++) { - if(!this->internal_frame_buffer[index]) { continue; } - - free(this->internal_frame_buffer[index]); - this->internal_frame_buffer[index] = nullptr; - this->internal_frame_buffer_size[index] = 0; - } - -#ifdef DO_DEADLOCK_REF - if(this->_recorder) { - this->_recorder->js_unref(); - } -#endif + { + std::lock_guard lock{this->consumer_handle->wrapper_mutex}; + this->consumer_handle->wrapper = nullptr; + } + this->consumer_handle = nullptr; } void AudioConsumerWrapper::do_wrap(const v8::Local &obj) { this->Wrap(obj); +#if 0 this->_call_data = Nan::async_callback([&] { Nan::HandleScope scope; @@ -99,7 +88,7 @@ void AudioConsumerWrapper::do_wrap(const v8::Local &obj) { this->_data_entries.pop_front(); } - const auto byte_length = buffer->sample_count * this->_handle->channel_count * 4; + const auto byte_length = buffer->sample_count * this->channel_count_ * 4; auto js_buffer = v8::ArrayBuffer::New(Nan::GetCurrentContext()->GetIsolate(), byte_length); auto js_fbuffer = v8::Float32Array::New(js_buffer, 0, byte_length / 4); @@ -110,6 +99,7 @@ void AudioConsumerWrapper::do_wrap(const v8::Local &obj) { (void) callback_function.As()->Call(Nan::GetCurrentContext(), Nan::Undefined(), 1, argv); } }); +#endif this->_call_ended = Nan::async_callback([&]{ Nan::HandleScope scope; @@ -131,92 +121,17 @@ void AudioConsumerWrapper::do_wrap(const v8::Local &obj) { (void) callback_function.As()->Call(Nan::GetCurrentContext(), Nan::Undefined(), 0, nullptr); }); - Nan::DefineOwnProperty(this->handle(), Nan::New("frameSize").ToLocalChecked(), Nan::New((uint32_t) this->_handle->frame_size), v8::ReadOnly | v8::DontDelete); - Nan::DefineOwnProperty(this->handle(), Nan::New("sampleRate").ToLocalChecked(), Nan::New((uint32_t) this->_handle->sample_rate), v8::ReadOnly | v8::DontDelete); - Nan::DefineOwnProperty(this->handle(), Nan::New("channelCount").ToLocalChecked(), Nan::New((uint32_t) this->_handle->channel_count), v8::ReadOnly | v8::DontDelete); + Nan::DefineOwnProperty(this->handle(), Nan::New("sampleRate").ToLocalChecked(), Nan::New((uint32_t) this->sample_rate_), v8::ReadOnly | v8::DontDelete); + Nan::DefineOwnProperty(this->handle(), Nan::New("channelCount").ToLocalChecked(), Nan::New((uint32_t) this->channel_count_), v8::ReadOnly | v8::DontDelete); } -void AudioConsumerWrapper::unbind() { - if(this->_handle) { - lock_guard lock{this->_handle->on_read_lock}; - this->_handle->on_read = nullptr; - } -} - -void AudioConsumerWrapper::process_data(const void *buffer, size_t samples) { - if(samples != 960) { - logger::error(logger::category::audio, tr("Received audio frame with invalid sample count (Expected 960, Received {})"), samples); - return; +void AudioConsumerWrapper::delete_consumer() { + if(this->consumer_handle) { + std::lock_guard lock{this->consumer_handle->wrapper_mutex}; + this->consumer_handle->wrapper = nullptr; } - - lock_guard lock{this->execute_mutex}; - if(this->filter_mode_ == FilterMode::BLOCK) { return; } - - bool should_process{true}; - if(this->filter_mode_ == FilterMode::FILTER) { - auto filters = this->filters(); - for(const auto& filter : filters) { - auto _filter = filter->filter(); - if(!_filter) continue; - - if(_filter->frame_size() != samples) { - cerr << "Tried to use a filter, but frame size does not match!" << endl; - continue; - } - if(!_filter->process(buffer)) { - should_process = false; - break; - } - } - } else if(this->filter_mode_ != FilterMode::BYPASS) { - should_process = false; - } - - if(!should_process) { - if(!this->last_consumed) { - this->last_consumed = true; - this->_call_ended(); - unique_lock native_read_lock(this->native_read_callback_lock); - if(this->native_read_callback) { - auto callback = this->native_read_callback; /* copy */ - native_read_lock.unlock(); - callback(nullptr, 0); /* notify end */ - } - } - return; - } - - if(this->last_consumed) { - this->_call_started(); - } - this->last_consumed = false; - - { - unique_lock native_read_lock(this->native_read_callback_lock); - if(this->native_read_callback) { - auto callback = this->native_read_callback; /* copy */ - native_read_lock.unlock(); - - callback(buffer, samples); - return; - } - } - - auto byte_length = samples * this->_handle->channel_count * 4; - auto buf = make_unique(); - buf->buffer = malloc(byte_length); - memcpy(buf->buffer, buffer, byte_length); - buf->sample_count = samples; - - { - lock_guard data_lock{this->_data_lock}; - this->_data_entries.push_back(move(buf)); - } - this->_call_data(); } - - std::shared_ptr AudioConsumerWrapper::create_filter(const std::string& name, const std::shared_ptr &impl) { auto result = shared_ptr(new AudioFilterWrapper(name, impl), [](AudioFilterWrapper* ptr) { assert(v8::Isolate::GetCurrent()); @@ -239,15 +154,17 @@ std::shared_ptr AudioConsumerWrapper::create_filter(const st } void AudioConsumerWrapper::delete_filter(const AudioFilterWrapper* filter) { - shared_ptr handle; /* need to keep the handle 'till everything has been finished */ + std::shared_ptr handle; /* need to keep the handle 'till everything has been finished */ + { - lock_guard lock(this->filter_mutex_); + std::lock_guard lock(this->filter_mutex_); for(auto& c : this->filter_) { if(&*c == filter) { handle = c; break; } } + if(!handle) { return; } @@ -261,20 +178,12 @@ void AudioConsumerWrapper::delete_filter(const AudioFilterWrapper* filter) { } { - lock_guard lock(this->execute_mutex); /* ensure that the filter isn't used right now */ + /* ensure that the filter isn't used right now */ + lock_guard lock{this->consumer_handle->wrapper_mutex}; handle->_filter = nullptr; } } -void AudioConsumerWrapper::reserve_internal_buffer(int index, size_t target) { - assert(index < kInternalFrameBufferCount); - if(this->internal_frame_buffer_size[index] < target) { - if(this->internal_frame_buffer_size[index]) { ::free(this->internal_frame_buffer[index]); } - - this->internal_frame_buffer[index] = malloc(target); - this->internal_frame_buffer_size[index] = target; - } -} NAN_METHOD(AudioConsumerWrapper::_get_filters) { auto handle = ObjectWrap::Unwrap(info.Holder()); @@ -308,8 +217,6 @@ NAN_METHOD(AudioConsumerWrapper::_unregister_filter) { NAN_METHOD(AudioConsumerWrapper::_create_filter_vad) { auto handle = ObjectWrap::Unwrap(info.Holder()); - auto consumer = handle->_handle; - assert(consumer); /* should never be null! */ if(info.Length() != 1 || !info[0]->IsNumber()) { Nan::ThrowError("invalid argument"); @@ -317,7 +224,7 @@ NAN_METHOD(AudioConsumerWrapper::_create_filter_vad) { } string error; - auto filter = make_shared(consumer->channel_count,consumer->sample_rate,consumer->frame_size); + auto filter = make_shared(handle->channel_count_, handle->sample_rate_); if(!filter->initialize(error, info[0]->Int32Value(Nan::GetCurrentContext()).FromMaybe(0), 2)) { Nan::ThrowError(Nan::New("failed to initialize filter (" + error + ")").ToLocalChecked()); return; @@ -329,20 +236,14 @@ NAN_METHOD(AudioConsumerWrapper::_create_filter_vad) { NAN_METHOD(AudioConsumerWrapper::_create_filter_threshold) { auto handle = ObjectWrap::Unwrap(info.Holder()); - auto consumer = handle->_handle; - assert(consumer); /* should never be null! */ if(info.Length() != 1 || !info[0]->IsNumber()) { Nan::ThrowError("invalid argument"); return; } - string error; - auto filter = make_shared(consumer->channel_count,consumer->sample_rate,consumer->frame_size); - if(!filter->initialize(error, (float) info[0]->Int32Value(Nan::GetCurrentContext()).FromMaybe(0), 2)) { - Nan::ThrowError(Nan::New("failed to initialize filter (" + error + ")").ToLocalChecked()); - return; - } + auto filter = make_shared(handle->channel_count_, handle->sample_rate_); + filter->initialize((float) info[0]->Int32Value(Nan::GetCurrentContext()).FromMaybe(0), 2); auto object = handle->create_filter("threshold", filter); info.GetReturnValue().Set(object->handle()); @@ -350,16 +251,8 @@ NAN_METHOD(AudioConsumerWrapper::_create_filter_threshold) { NAN_METHOD(AudioConsumerWrapper::_create_filter_state) { auto handle = ObjectWrap::Unwrap(info.Holder()); - auto consumer = handle->_handle; - assert(consumer); /* should never be null! */ - - string error; - auto filter = make_shared(consumer->channel_count, consumer->sample_rate, consumer->frame_size); - if(!filter->initialize(error)) { - Nan::ThrowError(Nan::New("failed to initialize filter (" + error + ")").ToLocalChecked()); - return; - } + auto filter = std::make_shared(handle->channel_count_, handle->sample_rate_); auto object = handle->create_filter("state", filter); info.GetReturnValue().Set(object->handle()); } @@ -379,4 +272,75 @@ NAN_METHOD(AudioConsumerWrapper::_set_filter_mode) { auto value = info[0].As()->Int32Value(info.GetIsolate()->GetCurrentContext()).FromMaybe(0); handle->filter_mode_ = (FilterMode) value; +} + +void AudioConsumerWrapper::InputConsumer::handle_buffer(const AudioInputBufferInfo &info, const float *buffer) { + std::lock_guard lock{this->wrapper_mutex}; + if(!this->wrapper) { + return; + } + + this->wrapper->handle_buffer(info, buffer); +} + + +void AudioConsumerWrapper::handle_buffer(const AudioInputBufferInfo &info, const float *buffer) { + bool should_process; + switch(this->filter_mode_) { + + case FilterMode::FILTER: + should_process = true; + for(const auto& filter : this->filters()) { + auto filter_instance = filter->filter(); + if(!filter_instance) continue; + + if(!filter_instance->process(info, buffer)) { + should_process = false; + break; + } + } + break; + + case FilterMode::BYPASS: + should_process = true; + break; + + case FilterMode::BLOCK: + default: + should_process = false; + return; + + } + + if(!should_process) { + if(!this->last_consumed) { + this->last_consumed = true; + this->_call_ended(); + + std::unique_lock native_read_lock(this->native_read_callback_lock); + if(this->native_read_callback) { + auto callback = this->native_read_callback; /* copy */ + native_read_lock.unlock(); + callback(nullptr, 0); /* notify end */ + } + } + } else { + if(this->last_consumed) { + this->last_consumed = false; + this->_call_started(); + } + + { + unique_lock native_read_lock(this->native_read_callback_lock); + if(this->native_read_callback) { + auto callback = this->native_read_callback; /* copy */ + native_read_lock.unlock(); + + callback(buffer, info.sample_count); + return; + } + } + + /* TODO: Callback JavaScript if required */ + } } \ No newline at end of file diff --git a/native/serverconnection/src/audio/js/AudioConsumer.h b/native/serverconnection/src/audio/js/AudioConsumer.h index c633680..7a6eb98 100644 --- a/native/serverconnection/src/audio/js/AudioConsumer.h +++ b/native/serverconnection/src/audio/js/AudioConsumer.h @@ -4,6 +4,8 @@ #include #include #include +#include "../AudioInput.h" +#include "../AudioReframer.h" namespace tc::audio { class AudioInput; @@ -13,6 +15,7 @@ namespace tc::audio { class Filter; } + struct AudioInputBufferInfo; namespace recorder { class AudioFilterWrapper; class AudioRecorderWrapper; @@ -23,6 +26,7 @@ namespace tc::audio { BLOCK }; + /* FIXME: Rechunk audio data to 20ms with the input frame rate (that's what we need for networking stuff) */ class AudioConsumerWrapper : public Nan::ObjectWrap { friend class AudioRecorderWrapper; public: @@ -38,7 +42,7 @@ namespace tc::audio { return my_constructor_template; } - AudioConsumerWrapper(AudioRecorderWrapper*, const std::shared_ptr& /* handle */); + AudioConsumerWrapper(const std::shared_ptr& /* input */); ~AudioConsumerWrapper() override; static NAN_METHOD(_get_filters); @@ -59,47 +63,38 @@ namespace tc::audio { return this->filter_; } + [[nodiscard]] inline auto channel_count() const { return this->channel_count_; }; + [[nodiscard]] inline auto sample_rate() const { return this->sample_rate_; }; + [[nodiscard]] inline FilterMode filter_mode() const { return this->filter_mode_; } - inline std::shared_ptr native_consumer() { return this->_handle; } std::mutex native_read_callback_lock; - std::function native_read_callback; + std::function native_read_callback; private: - AudioRecorderWrapper* _recorder; + struct InputConsumer : public AudioInputConsumer { + std::mutex wrapper_mutex{}; + AudioConsumerWrapper* wrapper{nullptr}; - std::mutex execute_mutex; - std::shared_ptr _handle; + void handle_buffer(const AudioInputBufferInfo &, const float *) override; + }; + + std::shared_ptr consumer_handle{}; + + size_t const channel_count_; + size_t const sample_rate_; + + Nan::JavaScriptQueue js_queue; std::mutex filter_mutex_; std::deque> filter_; FilterMode filter_mode_{FilterMode::FILTER}; bool last_consumed = false; - constexpr static auto kInternalFrameBufferCount{2}; - void* internal_frame_buffer[kInternalFrameBufferCount]{nullptr}; - size_t internal_frame_buffer_size[kInternalFrameBufferCount]{0}; - void do_wrap(const v8::Local& /* object */); + void delete_consumer(); - void unbind(); /* called with execute_lock locked */ - void process_data(const void* /* buffer */, size_t /* samples */); + void handle_buffer(const AudioInputBufferInfo& /* info */, const float* /* buffer */); - void reserve_internal_buffer(int /* buffer */, size_t /* bytes */); - - struct DataEntry { - void* buffer = nullptr; - size_t sample_count = 0; - - ~DataEntry() { - if(buffer) - free(buffer); - } - }; - - std::mutex _data_lock; - std::deque> _data_entries; - - Nan::callback_t<> _call_data; Nan::callback_t<> _call_ended; Nan::callback_t<> _call_started; }; diff --git a/native/serverconnection/src/audio/js/AudioFilter.cpp b/native/serverconnection/src/audio/js/AudioFilter.cpp index ab4b315..5715376 100644 --- a/native/serverconnection/src/audio/js/AudioFilter.cpp +++ b/native/serverconnection/src/audio/js/AudioFilter.cpp @@ -1,4 +1,6 @@ #include "AudioFilter.h" + +#include #include "../filter/FilterVad.h" #include "../filter/FilterThreshold.h" #include "../filter/FilterState.h" @@ -44,7 +46,7 @@ NAN_METHOD(AudioFilterWrapper::NewInstance) { Nan::ThrowError("invalid invoke!"); } -AudioFilterWrapper::AudioFilterWrapper(const std::string& name, const std::shared_ptr &filter) : _filter(filter), _name(name) { +AudioFilterWrapper::AudioFilterWrapper(std::string name, std::shared_ptr filter) : _filter{std::move(filter)}, _name{std::move(name)} { auto threshold_filter = dynamic_pointer_cast(this->_filter); if(threshold_filter) { this->_call_analyzed = Nan::async_callback([&](float value) { @@ -56,7 +58,7 @@ AudioFilterWrapper::AudioFilterWrapper(const std::string& name, const std::share v8::Local argv[1]; argv[0] = Nan::New(value); - cb->Call(Nan::GetCurrentContext(), Nan::Undefined(), 1, argv); + (void) cb->Call(Nan::GetCurrentContext(), Nan::Undefined(), 1, argv); } }); } @@ -99,7 +101,12 @@ NAN_METHOD(AudioFilterWrapper::_get_level) { return; } - info.GetReturnValue().Set((int) filter->mode()); + auto mode = filter->mode(); + if(mode.has_value()) { + info.GetReturnValue().Set((int) *mode); + } else { + /* We're using the preprocessor config */ + } } diff --git a/native/serverconnection/src/audio/js/AudioFilter.h b/native/serverconnection/src/audio/js/AudioFilter.h index 13b2ee7..1fceca6 100644 --- a/native/serverconnection/src/audio/js/AudioFilter.h +++ b/native/serverconnection/src/audio/js/AudioFilter.h @@ -25,7 +25,7 @@ namespace tc::audio { return my_constructor_template; } - AudioFilterWrapper(const std::string& name, const std::shared_ptr& /* handle */); + AudioFilterWrapper(std::string name, std::shared_ptr /* handle */); ~AudioFilterWrapper() override; static NAN_METHOD(_get_name); diff --git a/native/serverconnection/src/audio/js/AudioRecorder.cpp b/native/serverconnection/src/audio/js/AudioRecorder.cpp index f66f66b..693e3c5 100644 --- a/native/serverconnection/src/audio/js/AudioRecorder.cpp +++ b/native/serverconnection/src/audio/js/AudioRecorder.cpp @@ -77,7 +77,7 @@ AudioRecorderWrapper::~AudioRecorderWrapper() { } std::shared_ptr AudioRecorderWrapper::create_consumer() { - auto result = shared_ptr(new AudioConsumerWrapper(this, this->input_->create_consumer(960)), [](AudioConsumerWrapper* ptr) { + auto result = std::shared_ptr(new AudioConsumerWrapper(this->input_), [](AudioConsumerWrapper* ptr) { assert(v8::Isolate::GetCurrent()); ptr->Unref(); }); @@ -120,10 +120,7 @@ void AudioRecorderWrapper::delete_consumer(const AudioConsumerWrapper* consumer) } } - { - lock_guard lock(handle->execute_mutex); /* if we delete the consumer while executing strange stuff could happen */ - handle->unbind(); - } + handle->delete_consumer(); } void AudioRecorderWrapper::do_wrap(const v8::Local &obj) { diff --git a/native/serverconnection/src/connection/ServerConnection.cpp b/native/serverconnection/src/connection/ServerConnection.cpp index 01ea94f..b266dba 100644 --- a/native/serverconnection/src/connection/ServerConnection.cpp +++ b/native/serverconnection/src/connection/ServerConnection.cpp @@ -581,7 +581,9 @@ NAN_METHOD(ServerConnection::send_voice_data_raw) { auto voice_data = info[0].As()->Buffer(); auto vs = this->voice_connection ? this->voice_connection->voice_sender() : nullptr; - if(vs) vs->send_data(voice_data->GetContents().Data(), voice_data->GetContents().ByteLength() / (4 * channels), sample_rate, channels); + if(vs) { + vs->send_data((float*) voice_data->GetContents().Data(), voice_data->GetContents().ByteLength() / (4 * channels), sample_rate, channels); + } } #ifdef SHUFFLE_VOICE diff --git a/native/serverconnection/src/connection/audio/AudioSender.cpp b/native/serverconnection/src/connection/audio/AudioSender.cpp index a9dd3c7..cc40644 100644 --- a/native/serverconnection/src/connection/audio/AudioSender.cpp +++ b/native/serverconnection/src/connection/audio/AudioSender.cpp @@ -1,9 +1,10 @@ #include "AudioSender.h" #include "VoiceConnection.h" #include "../ServerConnection.h" -#include "../../logger.h" #include "../../audio/AudioEventLoop.h" #include "../../audio/AudioMerger.h" +#include "../../audio/AudioReframer.h" +#include "../../audio/codec/OpusConverter.h" using namespace std; using namespace tc; @@ -11,129 +12,110 @@ using namespace tc::audio; using namespace tc::audio::codec; using namespace tc::connection; -VoiceSender::VoiceSender(tc::connection::VoiceConnection *handle) : handle(handle) {} +VoiceSender::VoiceSender(tc::connection::VoiceConnection *handle) : handle{handle} {} VoiceSender::~VoiceSender() { + /* Note: We can't be within the event loop since if we were we would have a shared reference*/ audio::encode_event_loop->cancel(dynamic_pointer_cast(this->_ref.lock())); - this->clear_buffer(); /* buffer might be accessed within encode_raw_frame, but this could not be trigered while this will be deallocated! */ -} -bool VoiceSender::initialize_codec(std::string& error, connection::codec::value codec, size_t channels, size_t rate, bool reset_encoder) { - auto& data = this->codec[codec]; - bool new_allocated = !data; - if(new_allocated) { - data = make_unique(); - data->packet_counter = 0; - data->last_packet = chrono::system_clock::now(); - } - - auto info = codec::get_info(codec); - if(!info || !info->supported) { - if(new_allocated) - log_error(category::voice_connection, tr("Tried to send voice packet but we dont support the current codec ({})"), codec); - return false; - } - - if(!data->converter) { - data->converter = info->new_converter(error); - if(!data->converter) - return false; - } else if(reset_encoder) { - data->converter->reset_encoder(); - } - - if(!data->resampler || data->resampler->input_rate() != rate) { - data->resampler = make_shared(rate, data->converter->sample_rate(), data->converter->channels()); - } - - if(!data->resampler->valid()) { - error = "resampler is invalid"; - return false; - } - - return true; + { + lock_guard buffer_lock{this->raw_audio_buffer_mutex}; + while(this->raw_audio_buffers_head) { + auto buffer = std::exchange(this->raw_audio_buffers_head, this->raw_audio_buffers_head->next); + buffer->~AudioFrame(); + ::free(this->raw_audio_buffers_head); + } + this->raw_audio_buffers_tail = &this->raw_audio_buffers_head; + } } void VoiceSender::set_voice_send_enabled(bool flag) { this->voice_send_enabled = flag; } -void VoiceSender::send_data(const void *data, size_t samples, size_t rate, size_t channels) { - unique_lock lock{this->_execute_lock}; - if(!this->handle) { - log_warn(category::voice_connection, tr("Dropping raw audio frame because of an invalid handle.")); - return; - } - lock.unlock(); - +void VoiceSender::send_data(const float *data, size_t samples, size_t rate, size_t channels) { if(!this->voice_send_enabled) { log_warn(category::voice_connection, tr("Dropping raw audio frame because voice sending has been disabled!")); return; } - auto frame = make_unique(); + /* aligned for float values */ + const auto aligned_frame_size{((sizeof(AudioFrame) + 3) / sizeof(float)) * sizeof(float)}; + + auto frame = (AudioFrame*) malloc(aligned_frame_size + samples * channels * sizeof(float)); + new (&frame) AudioFrame{}; + + frame->sample_count = samples; frame->sample_rate = rate; frame->channels = channels; - frame->buffer = pipes::buffer{(void*) data, samples * channels * 4}; + + frame->buffer = (float*) frame + aligned_frame_size / sizeof(float); + memcpy(frame->buffer, data, samples * channels * sizeof(float)); + frame->timestamp = chrono::system_clock::now(); { - lock_guard buffer_lock(this->raw_audio_buffer_lock); - this->raw_audio_buffers.push_back(move(frame)); + lock_guard buffer_lock(this->raw_audio_buffer_mutex); + *this->raw_audio_buffers_tail = frame; + this->raw_audio_buffers_tail = &frame->next; } audio::encode_event_loop->schedule(dynamic_pointer_cast(this->_ref.lock())); } void VoiceSender::send_stop() { - unique_lock lock(this->_execute_lock); - if(!this->handle) { - log_warn(category::voice_connection, tr("Dropping audio end frame because of an invalid handle.")); - return; - } - lock.unlock(); - - - auto frame = make_unique(); - frame->sample_rate = 0; - frame->channels = 0; - frame->buffer = pipes::buffer{nullptr, 0}; + auto frame = (AudioFrame*) malloc(sizeof(AudioFrame)); + new (&frame) AudioFrame{}; frame->timestamp = chrono::system_clock::now(); { - lock_guard buffer_lock(this->raw_audio_buffer_lock); - this->raw_audio_buffers.push_back(move(frame)); + lock_guard buffer_lock{this->raw_audio_buffer_mutex}; + *this->raw_audio_buffers_tail = frame; + this->raw_audio_buffers_tail = &frame->next; } audio::encode_event_loop->schedule(dynamic_pointer_cast(this->_ref.lock())); } void VoiceSender::finalize() { - lock_guard lock(this->_execute_lock); + auto execute_lock = this->execute_lock(true); this->handle = nullptr; } +void VoiceSender::set_codec(connection::codec::value target_codec) { + this->target_codec_ = target_codec; +} + void VoiceSender::event_execute(const std::chrono::system_clock::time_point &point) { static auto max_time = chrono::milliseconds(10); bool reschedule = false; auto now = chrono::system_clock::now(); while(true) { - unique_lock buffer_lock(this->raw_audio_buffer_lock); - if(this->raw_audio_buffers.empty()) - break; - - if(chrono::system_clock::now() - now > max_time) { - reschedule = true; - break; + std::unique_lock buffer_lock{this->raw_audio_buffer_mutex}; + if(!this->raw_audio_buffers_head) { + break; } - auto entry = move(this->raw_audio_buffers.front()); - this->raw_audio_buffers.pop_front(); + auto next_buffer = std::exchange(this->raw_audio_buffers_head, this->raw_audio_buffers_head->next); + if(!this->raw_audio_buffers_head) { + assert(this->raw_audio_buffers_tail == &next_buffer->next); + this->raw_audio_buffers_tail = &this->raw_audio_buffers_head; + } buffer_lock.unlock(); - //TODO: Drop too old buffers! - this->encode_raw_frame(entry); + //TODO: Drop too old buffers! + + if(this->handle) { + this->encode_raw_frame(next_buffer); + } + + next_buffer->~AudioFrame(); + ::free(next_buffer); + if(chrono::system_clock::now() - now > max_time) { + reschedule = true; + break; + } } if(reschedule) { @@ -142,94 +124,139 @@ void VoiceSender::event_execute(const std::chrono::system_clock::time_point &poi } } -void VoiceSender::encode_raw_frame(const std::unique_ptr &frame) { - auto codec = this->_current_codec; - auto& codec_data = this->codec[codec]; +constexpr static auto kTempBufferMaxSampleCount{1024 * 8}; +void VoiceSender::encode_raw_frame(const AudioFrame* frame) { + if(frame->sample_rate == 0) { + /* Audio sequence end */ + this->audio_sequence_no = 0; + if(this->current_codec_.has_value()) { + this->flush_current_codec(); - bool flag_head = true, flag_reset = true; - if(codec_data) { - if(codec_data->last_packet + chrono::seconds(1) < frame->timestamp) - codec_data->packet_counter = 0; + auto server = this->handle->handle(); + server->send_voice_data(nullptr, 0, *this->current_codec_, false); + } + return; + } - flag_head = codec_data->packet_counter < 5; - flag_reset = codec_data->packet_counter == 0; + if(!this->current_codec_.has_value() || *this->current_codec_ != this->target_codec_) { + this->flush_current_codec(); - codec_data->packet_counter++; - codec_data->last_packet = frame->timestamp; - } + this->audio_sequence_no = 0; + this->codec_resampler = nullptr; + this->codec_reframer = nullptr; + this->codec_encoder = nullptr; + this->current_codec_ = std::make_optional(this->target_codec_); - if(frame->channels == 0 || frame->sample_rate == 0 || frame->buffer.empty()) { - lock_guard lock(this->_execute_lock); - if(!this->handle) { - log_warn(category::voice_connection, tr("Dropping audio end because of an invalid handle.")); - return; - } + std::string error{}; + switch(this->target_codec_) { + case codec::OPUS_VOICE: + this->codec_encoder = std::make_unique(OPUS_APPLICATION_VOIP); + if(!this->codec_encoder->initialize(error)) { + log_error(category::voice_connection, tr("Failed to initialize target audio codec {}"), (uint32_t) this->target_codec_); + this->codec_encoder = nullptr; + return; + } + break; - if(codec_data) { - codec_data->packet_counter = 0; - if(auto converter = codec_data->converter; converter) { - log_trace(category::voice_connection, tr("Resetting encoder")); - converter->reset_encoder(); - } - } - auto server = this->handle->handle(); - server->send_voice_data(this->_buffer, 0, codec, flag_head); - return; - } + case codec::OPUS_MUSIC: + this->codec_encoder = std::make_unique(OPUS_APPLICATION_AUDIO); + if(!this->codec_encoder->initialize(error)) { + log_error(category::voice_connection, tr("Failed to initialize target audio codec {}"), (uint32_t) this->target_codec_); + this->codec_encoder = nullptr; + return; + } + break; - string error; - if(flag_reset) { - log_trace(category::voice_connection, tr("Resetting encoder for voice sender")); - } - if(!this->initialize_codec(error, codec, frame->channels, frame->sample_rate, flag_reset)) { - log_error(category::voice_connection, tr("Failed to initialize codec: {}"), error); - return; - } + default: + log_error(category::voice_connection, tr("Unknown target encode codec {}"), (uint32_t) this->target_codec_); + return; + } + } - auto merged_channel_byte_size = codec_data->converter->channels() * (frame->buffer.length() / frame->channels); - auto estimated_resampled_byte_size = codec_data->resampler->estimated_output_size(merged_channel_byte_size); - this->ensure_buffer(max(estimated_resampled_byte_size, merged_channel_byte_size)); + if(!this->codec_encoder) { + /* Codec failed to initialize */ + return; + } - auto codec_channels = codec_data->converter->channels(); - if(!audio::merge::merge_channels_interleaved(this->_buffer, codec_channels, frame->buffer.data_ptr(), frame->channels, frame->buffer.length() / frame->channels / 4)) { - log_warn(category::voice_connection, tr("Failed to merge channels to output stream channel count! Dropping local voice packet")); - return; - } + const auto codec_channel_count = this->codec_encoder->channel_count(); + const auto codec_sample_rate = this->codec_encoder->sample_rate(); - auto resampled_samples{estimated_resampled_byte_size / frame->channels / sizeof(float)}; - if(!codec_data->resampler->process(this->_buffer, this->_buffer, merged_channel_byte_size / codec_channels / 4, resampled_samples)) { - log_error(category::voice_connection, tr("Failed to resample buffer."), resampled_samples); - return; - } + float temp_buffer[kTempBufferMaxSampleCount]; + size_t current_sample_count{frame->sample_count}; + float* current_sample_buffer; - if(!resampled_samples) { - /* we don't have any data */ - return; - } + if(frame->channels != codec_channel_count) { + assert(kTempBufferMaxSampleCount >= frame->sample_count * codec_channel_count); + if(!audio::merge::merge_channels_interleaved(temp_buffer, codec_channel_count, frame->buffer, frame->channels, frame->sample_count)) { + log_warn(category::voice_connection, tr("Failed to merge channels to output stream channel count! Dropping local voice packet")); + return; + } - if(resampled_samples * codec_channels * 4 != codec_data->converter->bytes_per_frame()) { - log_error(category::voice_connection, - tr("Could not encode audio frame. Frame length is not equal to code frame length! Codec: {}, Packet: {}"), - codec_data->converter->bytes_per_frame(), resampled_samples * codec_channels * 4 - ); - return; - } + current_sample_buffer = temp_buffer; + } else { + current_sample_buffer = frame->buffer; + } - char _packet_buffer[512]; - auto encoded_bytes = codec_data->converter->encode(error, this->_buffer, _packet_buffer, 512, flag_head); - if(encoded_bytes <= 0) { - log_error(category::voice_connection, tr("Failed to encode voice: {}"), error); - return; - } + if(frame->sample_rate != codec_sample_rate) { + if(!this->codec_resampler || this->codec_resampler->input_rate() != frame->sample_rate) { + this->codec_resampler = std::make_unique(frame->sample_rate, codec_sample_rate, codec_channel_count); + } - { - lock_guard lock(this->_execute_lock); - if(!this->handle) { - log_warn(category::voice_connection, tr("Dropping audio frame because of an invalid handle.")); - return; - } + size_t resampled_sample_count{this->codec_resampler->estimated_output_size(frame->sample_count)}; + assert(kTempBufferMaxSampleCount >= resampled_sample_count * codec_channel_count); + if(!this->codec_resampler->process(temp_buffer, current_sample_buffer, frame->sample_count, resampled_sample_count)) { + log_error(category::voice_connection, tr("Failed to resample buffer. Dropping audio frame")); + return; + } - auto server = this->handle->handle(); - server->send_voice_data(_packet_buffer, encoded_bytes, codec, flag_head); - } + current_sample_buffer = temp_buffer; + current_sample_count = resampled_sample_count; + } + + if(!this->codec_reframer) { + this->codec_reframer = std::make_unique(codec_channel_count, (size_t) (0.02 * codec_sample_rate)); + this->codec_reframer->on_frame = [&](const float* sample_buffer) { + assert(this->codec_reframer); + this->handle_network_frame(sample_buffer, this->codec_reframer->target_size(), false); + }; + this->codec_reframer->on_flush = [&](const float* sample_buffer, size_t sample_count) { + this->handle_network_frame(sample_buffer, sample_count, true); + }; + } + + this->codec_reframer->process(current_sample_buffer, current_sample_count); } + +constexpr static auto kMaxPacketSize{1500}; +void VoiceSender::handle_network_frame(const float *sample_buffer, size_t sample_count, bool is_flush) { + assert(this->current_codec_.has_value()); + assert(this->codec_encoder); + + //log_trace(category::voice_connection, tr("Encoding audio chunk of {}/{} aka {}ms with codec {}"), + // sample_count, this->codec_encoder->sample_rate(), sample_count * 1000 / this->codec_encoder->sample_rate(), *this->current_codec_); + + char packet_buffer[kMaxPacketSize]; + size_t packet_size{kMaxPacketSize}; + + EncoderBufferInfo buffer_info{}; + buffer_info.flush_encoder = is_flush; + buffer_info.sample_count = sample_count; + buffer_info.head_sequence = this->audio_sequence_no++ < 5; + + std::string error{}; + if(!this->codec_encoder->encode(error, packet_buffer, packet_size, buffer_info, sample_buffer)) { + log_error(category::voice_connection, tr("Failed to encode voice: {}"), error); + return; + } + + auto server = this->handle->handle(); + server->send_voice_data(packet_buffer, packet_size, *this->current_codec_, buffer_info.head_sequence); +} + +void VoiceSender::flush_current_codec() { + if(!this->codec_reframer) { + return; + } + + this->codec_reframer->flush(); +} \ No newline at end of file diff --git a/native/serverconnection/src/connection/audio/AudioSender.h b/native/serverconnection/src/connection/audio/AudioSender.h index aa8991e..ff980ce 100644 --- a/native/serverconnection/src/connection/audio/AudioSender.h +++ b/native/serverconnection/src/connection/audio/AudioSender.h @@ -2,14 +2,17 @@ #include #include +#include #include "VoiceClient.h" namespace tc { namespace audio { namespace codec { class Converter; + class AudioEncoder; } + class AudioReframer; class AudioResampler; class AudioOutputSource; } @@ -24,66 +27,54 @@ namespace tc { explicit VoiceSender(VoiceConnection*); virtual ~VoiceSender(); - codec::value get_codec() { return this->_current_codec; } - void set_codec(codec::value value) { this->_current_codec = value; } + void finalize(); + codec::value get_codec() { return this->current_codec_.value_or(this->target_codec_); } + void set_codec(codec::value value); - void finalize(); - void send_data(const void* /* buffer */, size_t /* samples */, size_t /* sample rate */, size_t /* channels */); + void send_data(const float* /* buffer */, size_t /* samples */, size_t /* sample rate */, size_t /* channels */); void send_stop(); void set_voice_send_enabled(bool /* flag */); private: + struct AudioFrame { + AudioFrame* next{nullptr}; + + float* buffer{nullptr}; + size_t sample_count{0}; + size_t sample_rate{0}; + size_t channels{0}; + + std::chrono::system_clock::time_point timestamp{}; + + ~AudioFrame() = default; + AudioFrame() = default; + }; + std::weak_ptr _ref; VoiceConnection* handle; - struct AudioCodec { - size_t packet_counter = 0; - std::chrono::system_clock::time_point last_packet; + std::mutex raw_audio_buffer_mutex{}; + AudioFrame* raw_audio_buffers_head{nullptr}; + AudioFrame** raw_audio_buffers_tail{&this->raw_audio_buffers_head}; - std::shared_ptr converter; - std::shared_ptr resampler; - }; - std::array, codec::MAX + 1> codec{nullptr}; + bool voice_send_enabled{false}; - bool initialize_codec(std::string&, codec::value /* codec */, size_t /* channels */, size_t /* source sample rate */, bool /* reset decoder */); + /* Codec specific values */ + codec::value target_codec_{codec::OPUS_VOICE}; + std::optional current_codec_{}; - codec::value _current_codec = codec::OPUS_VOICE; + std::unique_ptr codec_encoder{}; + std::unique_ptr codec_resampler{}; + std::unique_ptr codec_reframer{}; - std::mutex _execute_lock; + size_t audio_sequence_no{0}; - void* _buffer = nullptr; - size_t _buffer_size = 0; - void clear_buffer() { - if(this->_buffer) - ::free(this->_buffer); - this->_buffer = nullptr; - this->_buffer_size = 0; - } + /* Call these methods only when the execute lock has been accquired */ + void encode_raw_frame(const AudioFrame*); + void handle_network_frame(const float* /* buffer */, size_t /* sample count */, bool /* flush */); + void flush_current_codec(); - void ensure_buffer(size_t length) { - if(!this->_buffer || this->_buffer_size < length) { - if(this->_buffer) - ::free(this->_buffer); - this->_buffer = malloc(length); - this->_buffer_size = length; - } - } - - struct AudioFrame { - pipes::buffer buffer; - size_t sample_rate; - size_t channels; - - std::chrono::system_clock::time_point timestamp; - }; - - std::mutex raw_audio_buffer_lock; - std::deque> raw_audio_buffers; - - bool voice_send_enabled = false; - - void encode_raw_frame(const std::unique_ptr&); void event_execute(const std::chrono::system_clock::time_point &point) override; }; } diff --git a/native/serverconnection/src/connection/audio/VoiceConnection.cpp b/native/serverconnection/src/connection/audio/VoiceConnection.cpp index c3a07ac..18106e1 100644 --- a/native/serverconnection/src/connection/audio/VoiceConnection.cpp +++ b/native/serverconnection/src/connection/audio/VoiceConnection.cpp @@ -20,8 +20,8 @@ VoiceConnectionWrap::~VoiceConnectionWrap() { auto old_consumer = this->_voice_recoder_ptr; assert(old_consumer); - lock_guard read_lock(old_consumer->native_consumer()->on_read_lock); - old_consumer->native_consumer()->on_read = nullptr; + std::lock_guard read_lock{old_consumer->native_read_callback_lock}; + old_consumer->native_read_callback = nullptr; } } @@ -176,14 +176,12 @@ NAN_METHOD(VoiceConnectionWrap::set_audio_source) { connection->_voice_recoder_ptr = ObjectWrap::Unwrap(info[0]->ToObject(Nan::GetCurrentContext()).ToLocalChecked()); connection->_voice_recoder_handle.Reset(info[0]->ToObject(Nan::GetCurrentContext()).ToLocalChecked()); - auto native_consumer = connection->_voice_recoder_ptr->native_consumer(); - weak_ptr weak_handle = handle; - auto sample_rate = native_consumer->sample_rate; - auto channels = native_consumer->channel_count; + auto sample_rate = connection->_voice_recoder_ptr->sample_rate(); + auto channels = connection->_voice_recoder_ptr->channel_count(); lock_guard read_lock{connection->_voice_recoder_ptr->native_read_callback_lock}; - connection->_voice_recoder_ptr->native_read_callback = [weak_handle, sample_rate, channels](const void* buffer, size_t length) { + connection->_voice_recoder_ptr->native_read_callback = [weak_handle, sample_rate, channels](const float* buffer, size_t sample_count) { auto handle = weak_handle.lock(); if(!handle) { log_warn(category::audio, tr("Missing voice connection handle. Dropping input!")); @@ -192,8 +190,8 @@ NAN_METHOD(VoiceConnectionWrap::set_audio_source) { auto sender = handle->voice_sender(); if(sender) { - if(length > 0 && buffer) { - sender->send_data(buffer, length, sample_rate, channels); + if(sample_count > 0 && buffer) { + sender->send_data(buffer, sample_count, sample_rate, channels); } else { sender->send_stop(); }