Simplified the audio filters and reworked the audio sender logic

This commit is contained in:
WolverinDEV 2021-03-29 11:36:37 +02:00
parent 105f675590
commit 82333000d5
26 changed files with 854 additions and 674 deletions

View File

@ -129,9 +129,6 @@ endif ()
find_package(Soxr REQUIRED)
include_directories(${Soxr_INCLUDE_DIR})
find_package(fvad REQUIRED)
include_directories(${fvad_INCLUDE_DIR})
find_package(Opus REQUIRED)
find_package(spdlog REQUIRED)
find_package(WebRTCAudioProcessing REQUIRED)
@ -147,7 +144,6 @@ set(REQUIRED_LIBRARIES
DataPipes::core::static
${Soxr_LIBRARIES_STATIC}
${fvad_LIBRARIES_STATIC}
opus::static
${Ed25519_LIBRARIES_STATIC}
@ -158,6 +154,14 @@ set(REQUIRED_LIBRARIES
Nan::Helpers
)
# We depricated FVad in favour of WebRTC audio processing and it's VAD detector
if(USE_FVAD)
find_package(fvad REQUIRED)
include_directories(${fvad_INCLUDE_DIR})
list(APPEND REQUIRED_LIBRARIES ${fvad_LIBRARIES_STATIC})
add_compile_definitions(USE_FVAD)
endif()
if (SOUNDIO_BACKED)
list(APPEND REQUIRED_LIBRARIES soundio::static)
else()

View File

@ -225,7 +225,6 @@ export namespace audio {
export interface AudioConsumer {
readonly sampleRate: number;
readonly channelCount: number;
readonly frameSize: number;
get_filters() : ConsumeFilter[];
unregister_filter(filter: ConsumeFilter);

View File

@ -22,7 +22,7 @@ AudioConsumer::AudioConsumer(size_t channel_count, size_t sample_rate, size_t fr
sample_rate{sample_rate},
frame_size{frame_size} {
if(this->frame_size > 0) {
this->reframer = std::make_unique<InputReframer>(channel_count, frame_size);
this->reframer = std::make_unique<AudioReframer>(channel_count, frame_size);
this->reframer->on_frame = [&](const void* buffer) { this->handle_framed_data(buffer, this->frame_size); };
}
}
@ -164,14 +164,14 @@ void AudioInput::stop() {
this->input_recorder.reset();
}
std::vector<std::shared_ptr<AudioConsumer>> AudioInput::consumers() {
std::vector<std::shared_ptr<AudioConsumer>> result{};
std::vector<std::shared_ptr<AudioInputConsumer>> AudioInput::consumers() {
std::vector<std::shared_ptr<AudioInputConsumer>> result{};
result.reserve(10);
std::lock_guard consumer_lock{this->consumers_mutex};
result.reserve(this->consumers_.size());
this->consumers_.erase(std::remove_if(this->consumers_.begin(), this->consumers_.end(), [&](const std::weak_ptr<AudioConsumer>& weak_consumer) {
this->consumers_.erase(std::remove_if(this->consumers_.begin(), this->consumers_.end(), [&](const std::weak_ptr<AudioInputConsumer>& weak_consumer) {
auto consumer = weak_consumer.lock();
if(!consumer) {
return true;
@ -184,13 +184,22 @@ std::vector<std::shared_ptr<AudioConsumer>> AudioInput::consumers() {
return result;
}
std::shared_ptr<AudioConsumer> AudioInput::create_consumer(size_t frame_length) {
auto result = std::shared_ptr<AudioConsumer>(new AudioConsumer(this->channel_count_, this->sample_rate_, frame_length));
{
std::lock_guard lock(this->consumers_mutex);
this->consumers_.push_back(result);
}
return result;
void AudioInput::register_consumer(const std::shared_ptr<AudioInputConsumer>& consumer) {
std::lock_guard lock{this->consumers_mutex};
this->consumers_.push_back(consumer);
}
void AudioInput::remove_consumer(const std::shared_ptr<AudioInputConsumer> &target_consumer) {
std::lock_guard consumer_lock{this->consumers_mutex};
this->consumers_.erase(std::remove_if(this->consumers_.begin(), this->consumers_.end(), [&](const std::weak_ptr<AudioInputConsumer>& weak_consumer) {
auto consumer = weak_consumer.lock();
if(!consumer) {
return true;
}
return consumer == target_consumer;
}), this->consumers_.end());
}
std::shared_ptr<AudioInputAudioLevelMeter> AudioInput::create_level_meter(bool preprocess) {
@ -198,8 +207,8 @@ std::shared_ptr<AudioInputAudioLevelMeter> AudioInput::create_level_meter(bool p
{
std::lock_guard lock{this->consumers_mutex};
auto& list = preprocess ? this->level_meter_preprocess : this->level_meter_postprocess;
list.push_back(level_meter);
auto& level_meters = preprocess ? this->level_meter_preprocess : this->level_meter_postprocess;
level_meters.push_back(level_meter);
}
return level_meter;
@ -295,6 +304,9 @@ void AudioInput::process_audio_chunk(float *chunk) {
assert(memset(temp_sample_buffer, 0, sizeof(float) * kTempSampleBufferSize));
assert(memset(out_sample_buffer, 0, sizeof(float) * kTempSampleBufferSize));
AudioInputBufferInfo buffer_info{};
buffer_info.sample_count = chunk_sample_count;
this->invoke_level_meter(true, chunk, this->channel_count_, chunk_sample_count);
if(auto processor{this->audio_processor_}; processor) {
assert(kTempSampleBufferSize >= chunk_sample_count * this->channel_count_ * sizeof(float));
@ -316,6 +328,10 @@ void AudioInput::process_audio_chunk(float *chunk) {
audio::interleave_vec(out_sample_buffer, channel_ptr, this->channel_count_, chunk_sample_count);
chunk = out_sample_buffer;
if(process_statistics->voice_detected.has_value()) {
buffer_info.vad_detected.emplace(*process_statistics->voice_detected);
}
}
audio::apply_gain(chunk, this->channel_count_, chunk_sample_count, this->volume_);
@ -323,7 +339,7 @@ void AudioInput::process_audio_chunk(float *chunk) {
auto begin = std::chrono::system_clock::now();
for(const auto& consumer : this->consumers()) {
consumer->process_data(out_sample_buffer, chunk_sample_count);
consumer->handle_buffer(buffer_info, out_sample_buffer);
}
auto end = std::chrono::system_clock::now();

View File

@ -5,6 +5,7 @@
#include <memory>
#include <iostream>
#include <functional>
#include <optional>
#include "./AudioSamples.h"
#include "./driver/AudioDriver.h"
#include "../ring_buffer.h"
@ -12,7 +13,7 @@
namespace tc::audio {
class AudioInput;
class InputReframer;
class AudioReframer;
class AudioResampler;
class AudioProcessor;
class AudioInputAudioLevelMeter;
@ -30,12 +31,21 @@ namespace tc::audio {
private:
AudioConsumer(size_t channel_count, size_t sample_rate, size_t frame_size);
std::unique_ptr<InputReframer> reframer;
std::unique_ptr<AudioReframer> reframer;
void process_data(const void* /* buffer */, size_t /* samples */);
void handle_framed_data(const void* /* buffer */, size_t /* samples */);
};
struct AudioInputBufferInfo {
size_t sample_count{0};
std::optional<bool> vad_detected{};
};
struct AudioInputConsumer {
virtual void handle_buffer(const AudioInputBufferInfo& /* info */, const float* /* buffer */) = 0;
};
class AudioInput : public AudioDeviceRecord::Consumer {
public:
AudioInput(size_t /* channels */, size_t /* sample rate */);
@ -49,8 +59,10 @@ namespace tc::audio {
[[nodiscard]] bool recording();
void stop();
[[nodiscard]] std::vector<std::shared_ptr<AudioConsumer>> consumers();
[[nodiscard]] std::shared_ptr<AudioConsumer> create_consumer(size_t /* frame size */);
[[nodiscard]] std::vector<std::shared_ptr<AudioInputConsumer>> consumers();
void register_consumer(const std::shared_ptr<AudioInputConsumer>& /* consumer */);
void remove_consumer(const std::shared_ptr<AudioInputConsumer>& /* consumer */);
[[nodiscard]] std::shared_ptr<AudioInputAudioLevelMeter> create_level_meter(bool /* pre process */);
[[nodiscard]] inline auto audio_processor() { return this->audio_processor_; }
@ -84,25 +96,23 @@ namespace tc::audio {
size_t const sample_rate_;
std::mutex consumers_mutex{};
std::deque<std::weak_ptr<AudioConsumer>> consumers_{};
std::deque<std::weak_ptr<AudioInputConsumer>> consumers_{};
std::deque<std::weak_ptr<AudioInputAudioLevelMeter>> level_meter_preprocess{};
std::deque<std::weak_ptr<AudioInputAudioLevelMeter>> level_meter_postprocess{};
std::recursive_mutex input_source_lock{};
std::shared_ptr<AudioInitializeHook> initialize_hook_handle{};
std::shared_ptr<EventLoopCallback> event_loop_entry{};
std::shared_ptr<AudioProcessor> audio_processor_{};
ring_buffer input_buffer;
std::recursive_mutex input_source_lock{};
std::shared_ptr<AudioDeviceRecord> input_recorder{};
std::unique_ptr<AudioResampler> resampler_{nullptr};
std::shared_ptr<AudioDevice> input_device{};
float volume_{1.f};
std::shared_ptr<AudioDeviceRecord> input_recorder{};
std::shared_ptr<AudioProcessor> audio_processor_{};
std::shared_ptr<AudioInitializeHook> initialize_hook_handle{};
void allocate_input_buffer_samples(size_t /* sample count */);
[[nodiscard]] inline auto chunk_sample_count() const { return (kChunkSizeMs * this->sample_rate_) / 1000; }

View File

@ -55,7 +55,6 @@ void AudioOutputSource::apply_fadein() {
for(size_t index{0}; index < fade_samples; index++) {
const auto offset = (float) ((float) (index + 1) / (float) fade_samples);
const auto volume = std::min(log10f(1 - offset) / -2.71828182845904f, 1.f);
for(int channel{0}; channel < this->channel_count_; channel++) {
*write_ptr++ *= volume;
}
@ -102,7 +101,7 @@ bool AudioOutputSource::pop_samples_(void *target_buffer, size_t target_sample_c
if(auto callback{this->on_underflow}; callback) {
if(callback(missing_samples)) {
/* We've been filled up again. Trying again to fill the output buffer. */
return this->pop_samples(target_buffer, target_sample_count);
return this->pop_samples_(target_buffer, target_sample_count);
}
}
@ -126,7 +125,7 @@ bool AudioOutputSource::pop_samples_(void *target_buffer, size_t target_sample_c
this->buffer_state = BufferState::fadeout;
if(write_samples < target_sample_count) {
/* Fill the rest of the buffer with the fadeout content */
this->pop_samples((char*) target_buffer + write_byte_size, target_sample_count - write_samples);
this->pop_samples_((char*) target_buffer + write_byte_size, target_sample_count - write_samples);
}
} else {
/* We can just normally copy the buffer */
@ -184,7 +183,7 @@ ssize_t AudioOutputSource::enqueue_samples_(const void *source_buffer, size_t sa
this->buffer_state = BufferState::playing;
if(sample_count > missing_samples) {
/* we've more data to write */
return this->enqueue_samples((const char*) source_buffer + write_byte_size, sample_count - missing_samples) + write_sample_count;
return this->enqueue_samples_((const char*) source_buffer + write_byte_size, sample_count - missing_samples) + write_sample_count;
} else {
return write_sample_count;
}

View File

@ -4,52 +4,66 @@
using namespace tc::audio;
InputReframer::InputReframer(size_t channels, size_t frame_size) : _frame_size(frame_size), _channels(channels) {
AudioReframer::AudioReframer(size_t channels, size_t target_frame_size) : frame_size_(target_frame_size), channels_(channels) {
this->buffer = nullptr;
this->_buffer_index = 0;
this->buffer_index_ = 0;
}
InputReframer::~InputReframer() {
AudioReframer::~AudioReframer() {
if(this->buffer) {
free(this->buffer);
}
}
void InputReframer::process(const void *source, size_t samples) {
void AudioReframer::process(const void *source, size_t samples) {
if(!this->buffer) {
this->buffer = (float*) malloc(this->_channels * this->_frame_size * sizeof(float));
this->buffer = (float*) malloc(this->channels_ * this->frame_size_ * sizeof(float));
}
assert(this->on_frame);
if(this->_buffer_index > 0) {
if(this->_buffer_index + samples > this->_frame_size) {
auto required = this->_frame_size - this->_buffer_index;
auto length = required * this->_channels * sizeof(float);
if(this->buffer_index_ > 0) {
if(this->buffer_index_ + samples > this->frame_size_) {
auto required = this->frame_size_ - this->buffer_index_;
auto length = required * this->channels_ * sizeof(float);
memcpy((char*) this->buffer + this->_buffer_index * sizeof(float) * this->_channels, source, length);
memcpy((char*) this->buffer + this->buffer_index_ * sizeof(float) * this->channels_, source, length);
samples -= required;
source = (char*) source + length;
this->on_frame(this->buffer);
this->on_frame((float*) this->buffer);
} else {
memcpy((char*) this->buffer + this->_buffer_index * sizeof(float) * this->_channels, source, samples * this->_channels * sizeof(float));
this->_buffer_index += samples;
memcpy((char*) this->buffer + this->buffer_index_ * sizeof(float) * this->channels_, source, samples * this->channels_ * sizeof(float));
this->buffer_index_ += samples;
return;
}
}
auto _on_frame = this->on_frame;
while(samples > this->_frame_size) {
while(samples > this->frame_size_) {
if(_on_frame) {
_on_frame(source);
_on_frame((float*) source);
}
samples -= this->_frame_size;
source = (char*) source + this->_frame_size * this->_channels * sizeof(float);
samples -= this->frame_size_;
source = (char*) source + this->frame_size_ * this->channels_ * sizeof(float);
}
if(samples > 0) {
memcpy((char*) this->buffer, source, samples * this->_channels * sizeof(float));
memcpy((char*) this->buffer, source, samples * this->channels_ * sizeof(float));
}
this->_buffer_index = samples;
this->buffer_index_ = samples;
}
void AudioReframer::flush() {
if(this->buffer_index_ == 0) {
return;
}
if(!this->on_flush) {
this->buffer_index_ = 0;
return;
}
this->on_flush((const float*) this->buffer, this->buffer_index_);
this->buffer_index_ = 0;
}

View File

@ -4,22 +4,24 @@
#include <cstdio>
namespace tc::audio {
class InputReframer {
class AudioReframer {
public:
InputReframer(size_t channels, size_t frame_size);
virtual ~InputReframer();
AudioReframer(size_t channels, size_t target_frame_size);
virtual ~AudioReframer();
void process(const void* /* source */, size_t /* samples */);
void flush();
[[nodiscard]] inline size_t channels() const { return this->_channels; }
[[nodiscard]] inline size_t frame_size() const { return this->_frame_size; }
[[nodiscard]] inline size_t channels() const { return this->channels_; }
[[nodiscard]] inline size_t target_size() const { return this->frame_size_; }
std::function<void(const void* /* buffer */)> on_frame;
std::function<void(const float* /* buffer */)> on_frame;
std::function<void(const float* /* buffer */, size_t /* sample count */)> on_flush;
private:
void* buffer;
size_t _buffer_index;
size_t buffer_index_;
size_t _channels;
size_t _frame_size;
size_t channels_;
size_t frame_size_;
};
}

View File

@ -6,63 +6,103 @@
#define ssize_t int64_t
#endif
namespace tc {
namespace audio {
namespace codec {
namespace type {
enum value {
undefined,
namespace tc::audio::codec {
namespace type {
enum value {
undefined,
/* supported */
opus,
speex,
/* supported */
opus,
speex,
/* unsupported */
flac,
celt
};
/* unsupported */
flac,
celt
};
extern bool supported(value);
}
extern bool supported(value);
}
class Converter {
public:
Converter(size_t /* channels */, size_t /* sample rate */, size_t /* frame size */);
virtual ~Converter();
class Converter {
public:
Converter(size_t /* channels */, size_t /* sample rate */, size_t /* frame size */);
virtual ~Converter();
/* initialize parameters depend on the codec */
virtual bool valid() = 0;
virtual void finalize() = 0;
/* initialize parameters depend on the codec */
virtual bool valid() = 0;
virtual void finalize() = 0;
virtual void reset_encoder() = 0;
virtual void reset_decoder() = 0;
virtual void reset_encoder() = 0;
virtual void reset_decoder() = 0;
/**
* @return number of bytes written on success
*/
virtual ssize_t encode(std::string& /* error */, const void* /* source */, void* /* destination */, size_t /* destination byte length */, bool /* head package */) = 0;
/**
* @return number of bytes written on success
*/
virtual ssize_t encode(std::string& /* error */, const void* /* source */, void* /* destination */, size_t /* destination byte length */, bool /* head package */) = 0;
/**
* @return number of samples on success
*/
virtual ssize_t decode(std::string& /* error */, const void* /* source */, size_t /* source byte length */, void* /* destination */, bool /* fec decoding */) = 0;
virtual ssize_t decode_lost(std::string& /* error */, size_t /* packets */) = 0;
/**
* @return number of samples on success
*/
virtual ssize_t decode(std::string& /* error */, const void* /* source */, size_t /* source byte length */, void* /* destination */, bool /* fec decoding */) = 0;
virtual ssize_t decode_lost(std::string& /* error */, size_t /* packets */) = 0;
virtual size_t expected_encoded_length(size_t /* sample count */) = 0;
virtual size_t expected_decoded_length(const void* /* source */, size_t /* source byte length */) {
return this->bytes_per_frame();
}
virtual size_t expected_encoded_length(size_t /* sample count */) = 0;
virtual size_t expected_decoded_length(const void* /* source */, size_t /* source byte length */) {
return this->bytes_per_frame();
}
inline size_t channels() { return this->_channels; }
inline size_t sample_rate() { return this->_sample_rate; }
inline size_t frame_size() { return this->_frame_size; }
inline size_t channels() { return this->_channels; }
inline size_t sample_rate() { return this->_sample_rate; }
inline size_t frame_size() { return this->_frame_size; }
inline size_t bytes_per_frame() { return this->_channels * this->_frame_size * 4; }
protected:
size_t _frame_size;
size_t _channels;
size_t _sample_rate;
};
}
}
inline size_t bytes_per_frame() { return this->_channels * this->_frame_size * 4; }
protected:
size_t _frame_size;
size_t _channels;
size_t _sample_rate;
};
struct EncoderBufferInfo {
size_t sample_count{0};
bool head_sequence{false};
bool flush_encoder{false};
};
/**
* Encoders should only be accessed by one thread at once
*/
class AudioEncoder {
public:
explicit AudioEncoder() = default;
virtual ~AudioEncoder() = default;
[[nodiscard]] virtual bool valid() const = 0;
[[nodiscard]] virtual bool initialize(std::string& /* error */) = 0;
virtual void reset_sequence() = 0;
/**
* Get the codecs sample rate.
*/
[[nodiscard]] virtual size_t sample_rate() const = 0;
/**
* Get the codecs audio channel count.
*/
[[nodiscard]] virtual size_t channel_count() const = 0;
/**
* @returns the expected output length.
* If unknown a size near the MTU will be returned.
*/
[[nodiscard]] virtual size_t expected_encoded_length(const float* /* samples */, size_t /* sample count */) const = 0;
/**
* Encode a chunk of audio data.
* @return `true` on success else `false`
*/
[[nodiscard]] virtual bool encode(std::string& /* error */, void* /* target buffer */, size_t& /* target length */, const EncoderBufferInfo& /* buffer info */, const float* /* samples */) = 0;
};
class AudioDecoder {};
}

View File

@ -157,4 +157,94 @@ bool OpusConverter::_finalize_encoder(std::string &) {
this->encoder = nullptr;
}
return true;
}
}
OpusAudioEncoder::OpusAudioEncoder(int application_type) : application_type_{application_type} {}
OpusAudioEncoder::~OpusAudioEncoder() noexcept {
if(this->encoder) {
opus_encoder_destroy(this->encoder);
this->encoder = nullptr;
}
};
bool OpusAudioEncoder::valid() const {
return this->encoder != nullptr;
}
bool OpusAudioEncoder::initialize(string &error) {
int error_id = 0;
this->encoder = opus_encoder_create((opus_int32) this->sample_rate(), (int) this->channel_count(), this->application_type_, &error_id);
if(!this->encoder || error_id) {
error = "failed to create encoder (" + to_string(error_id) + ")";
goto cleanup_error;
}
error_id = opus_encoder_ctl(this->encoder, OPUS_SET_BITRATE(64000));
if(error_id) {
error = "failed to set bitrate (" + to_string(error_id) + ")";
goto cleanup_error;
}
error_id = opus_encoder_ctl(this->encoder, OPUS_SET_INBAND_FEC(1));
if(error_id) {
error = "failed to enable fec (" + to_string(error_id) + ")";
goto cleanup_error;
}
error_id = opus_encoder_ctl(this->encoder, OPUS_SET_PACKET_LOSS_PERC(15));
if(error_id) {
error = "failed to assume a 15% packet loss (" + to_string(error_id) + ")";
goto cleanup_error;
}
return true;
cleanup_error:
if(this->encoder) {
opus_encoder_destroy(this->encoder);
this->encoder = nullptr;
}
return false;
}
void OpusAudioEncoder::reset_sequence() {
auto result = opus_encoder_ctl(this->encoder, OPUS_RESET_STATE);
if(result != OPUS_OK) {
log_warn(category::audio, tr("Failed to reset opus encoder. Opus result: {}"), result);
}
}
size_t OpusAudioEncoder::sample_rate() const {
return 48000;
}
size_t OpusAudioEncoder::channel_count() const {
if(this->application_type() == OPUS_APPLICATION_AUDIO) {
return 2;
} else {
return 1;
}
}
size_t OpusAudioEncoder::expected_encoded_length(const float *, size_t) const {
return 1500;
}
bool OpusAudioEncoder::encode(std::string& error, void *target_buffer, size_t &target_size, const EncoderBufferInfo &info, const float *source_buffer) {
if(info.sample_count == 0) {
/* flush request but we've no internal buffers */
target_size = 0;
return true;
}
/* TODO: Use some calculated variables here provided via EncoderBufferInfo */
opus_encoder_ctl(encoder, OPUS_SET_PACKET_LOSS_PERC(info.head_sequence ? 100 : 15));
auto result = opus_encode_float(this->encoder, source_buffer, (int) info.sample_count, (uint8_t*) target_buffer, (opus_int32) target_size);
if(result < OPUS_OK) {
error = to_string(result) + "|" + opus_strerror(result);
return false;
}
target_size = (size_t) result;
return true;
}

View File

@ -4,41 +4,63 @@
#include <opus/opus.h>
#include <mutex>
namespace tc {
namespace audio {
namespace codec {
class OpusConverter : public Converter {
public:
OpusConverter(size_t /* channels */, size_t /* sample rate */, size_t /* frame size */);
virtual ~OpusConverter();
namespace tc::audio::codec {
class OpusConverter : public Converter {
public:
OpusConverter(size_t /* channels */, size_t /* sample rate */, size_t /* frame size */);
virtual ~OpusConverter();
bool valid() override;
bool valid() override;
bool initialize(std::string& /* error */, int /* application type */);
void finalize() override;
bool initialize(std::string& /* error */, int /* application type */);
void finalize() override;
void reset_encoder() override;
void reset_decoder() override;
void reset_encoder() override;
void reset_decoder() override;
ssize_t encode(std::string & /* error */, const void * /* source */, void * /* target */, size_t /* target size */, bool /* head package */) override;
ssize_t decode(std::string & /* error */, const void * /* source */, size_t /* source size */, void *pVoid1, bool /* use fec */) override;
ssize_t encode(std::string & /* error */, const void * /* source */, void * /* target */, size_t /* target size */, bool /* head package */) override;
ssize_t decode(std::string & /* error */, const void * /* source */, size_t /* source size */, void *pVoid1, bool /* use fec */) override;
ssize_t decode_lost(std::string &string, size_t /* packets */) override;
ssize_t decode_lost(std::string &string, size_t /* packets */) override;
size_t expected_encoded_length(size_t size) override;
private:
std::mutex coder_lock;
OpusDecoder* decoder = nullptr;
OpusEncoder* encoder = nullptr;
size_t expected_encoded_length(size_t size) override;
private:
std::mutex coder_lock;
OpusDecoder* decoder = nullptr;
OpusEncoder* encoder = nullptr;
bool fec_decoder_{false};
int _application_type = 0;
bool fec_decoder_{false};
int _application_type = 0;
bool _finalize_encoder(std::string& /* error */);
bool _finalize_decoder(std::string& /* error */);
bool _initialize_encoder(std::string& /* error */);
bool _initialize_decoder(std::string& /* error */);
};
}
}
bool _finalize_encoder(std::string& /* error */);
bool _finalize_decoder(std::string& /* error */);
bool _initialize_encoder(std::string& /* error */);
bool _initialize_decoder(std::string& /* error */);
};
class OpusAudioEncoder : public AudioEncoder {
public:
explicit OpusAudioEncoder(int /* application type */);
~OpusAudioEncoder() override;
bool valid() const override;
bool initialize(std::string &string) override;
void reset_sequence() override;
size_t sample_rate() const override;
size_t channel_count() const override;
size_t expected_encoded_length(const float *pDouble, size_t size) const override;
bool encode(std::string&, void *, size_t &, const EncoderBufferInfo &, const float *) override;
[[nodiscard]] inline auto application_type() const { return this->application_type_; }
private:
int application_type_;
OpusEncoder* encoder{nullptr};
};
}

View File

@ -3,25 +3,21 @@
#include <cstdint>
#include <cstddef>
namespace tc {
namespace audio {
namespace filter {
class Filter {
public:
Filter(size_t channel_count, size_t sample_rate, size_t frame_size) :
_frame_size(frame_size), _sample_rate(sample_rate), _channels(channel_count) {}
namespace tc::audio {
struct AudioInputBufferInfo;
virtual bool process(const void* /* buffer */) = 0;
namespace filter {
class Filter {
public:
Filter(size_t channel_count, size_t sample_rate) : sample_rate_{sample_rate}, channels_{channel_count} {}
inline size_t sample_rate() { return this->_sample_rate; }
inline size_t channels() { return this->_channels; }
inline size_t frame_size() { return this->_frame_size; }
virtual bool process(const AudioInputBufferInfo& /* info */, const float* /* buffer */) = 0;
protected:
size_t _frame_size;
size_t _sample_rate;
size_t _channels;
};
}
inline size_t sample_rate() { return this->sample_rate_; }
inline size_t channels() { return this->channels_; }
protected:
size_t sample_rate_;
size_t channels_;
};
}
}

View File

@ -1,17 +1,11 @@
#include "FilterState.h"
#include "Filter.h"
#include "./FilterState.h"
using namespace std;
using namespace tc::audio;
using namespace tc::audio::filter;
StateFilter::StateFilter(size_t a, size_t b, size_t c) : Filter(a, b, c) {}
StateFilter::~StateFilter() {}
StateFilter::StateFilter(size_t a, size_t b) : Filter{a, b} {}
bool StateFilter::initialize(std::string &) {
return true;
}
bool StateFilter::process(const void *_buffer) {
return !this->_consume;
bool StateFilter::process(const AudioInputBufferInfo &, const float *) {
return !this->consume_;
}

View File

@ -1,24 +1,20 @@
#pragma once
#include "Filter.h"
#include "./Filter.h"
#include <mutex>
#include <fvad.h>
namespace tc {
namespace audio {
namespace filter {
class StateFilter : public Filter {
public:
StateFilter(size_t /* channel count */, size_t /* sample rate */, size_t /* frame size */);
virtual ~StateFilter();
StateFilter(size_t /* channel count */, size_t /* sample rate */);
bool process(const AudioInputBufferInfo& /* info */, const float* /* buffer */) override;
bool initialize(std::string& /* error */);
bool process(const void* /* buffer */) override;
inline bool consumes_input() { return this->_consume; }
inline void set_consume_input(bool state) { this->_consume = state; }
[[nodiscard]] inline auto consumes_input() const { return this->consume_; }
inline void set_consume_input(bool state) { this->consume_ = state; }
private:
bool _consume = false;
bool consume_{false};
};
}
}

View File

@ -1,46 +1,44 @@
#include <iostream>
#include <algorithm>
#include <cmath>
#include "FilterThreshold.h"
#include "./FilterThreshold.h"
#include "../AudioInput.h"
#include "../processing/AudioVolume.h"
using namespace std;
using namespace tc::audio;
using namespace tc::audio::filter;
ThresholdFilter::ThresholdFilter(size_t a, size_t b, size_t c) : Filter(a, b, c) {}
ThresholdFilter::ThresholdFilter(size_t a, size_t b) : Filter(a, b) {}
ThresholdFilter::~ThresholdFilter() = default;
bool ThresholdFilter::initialize(std::string &, float val, size_t margin) {
this->_threshold = val;
this->_margin_samples = margin;
return true;
void ThresholdFilter::initialize(float threshold, size_t margin) {
this->threshold_ = threshold;
this->margin_samples_ = margin;
}
bool ThresholdFilter::process(const void *_buffer) {
bool ThresholdFilter::process(const AudioInputBufferInfo& info, const float* buffer) {
auto analyze_callback = this->on_analyze;
float value = audio::audio_buffer_level((const float*) _buffer, this->_channels, this->_frame_size);
float value = audio::audio_buffer_level(buffer, this->channels_, info.sample_count);
auto last_level = this->_current_level;
auto last_level = this->current_level_;
float smooth;
if(this->_margin_processed_samples == 0) {
if(this->margin_processed_samples_ == 0) {
/* we're in release */
smooth = this->_release_smooth;
smooth = this->release_smooth_;
} else {
smooth = this->_attack_smooth;
smooth = this->attack_smooth_;
}
this->_current_level = last_level * smooth + value * (1 - smooth);
this->current_level_ = last_level * smooth + value * (1 - smooth);
//log_trace(category::audio, "Vad level: before: {}, edit: {}, now: {}, smooth: {}", last_level, value, this->_current_level, smooth);
if(analyze_callback) {
analyze_callback(this->_current_level);
analyze_callback(this->current_level_);
}
if(this->_current_level >= this->_threshold) {
this->_margin_processed_samples = 0;
if(this->current_level_ >= this->threshold_) {
this->margin_processed_samples_ = 0;
return true;
}
return (this->_margin_processed_samples += this->_frame_size) < this->_margin_samples;
return (this->margin_processed_samples_ += info.sample_count) < this->margin_samples_;
}

View File

@ -1,41 +1,40 @@
#pragma once
#include "Filter.h"
#include "./Filter.h"
#include <mutex>
#include <fvad.h>
#include <functional>
namespace tc::audio::filter {
class ThresholdFilter : public Filter {
public:
ThresholdFilter(size_t /* channel count */, size_t /* sample rate */, size_t /* frame size */);
ThresholdFilter(size_t /* channel count */, size_t /* sample rate */);
virtual ~ThresholdFilter();
bool initialize(std::string& /* error */, float /* threshold */, size_t /* margin frames */);
bool process(const void* /* buffer */) override;
void initialize(float /* threshold */, size_t /* margin frames */);
bool process(const AudioInputBufferInfo& /* info */, const float* /* buffer */) override;
inline void set_threshold(float value) { this->_threshold = value; }
[[nodiscard]] inline float threshold() const { return this->_threshold; }
[[nodiscard]] inline float threshold() const { return this->threshold_; }
inline void set_threshold(float value) { this->threshold_ = value; }
/* in seconds */
[[nodiscard]] inline float margin_release_time() { return (float) this->_margin_samples / (float) this->_sample_rate; }
inline void set_margin_release_time(float value) { this->_margin_samples = (size_t) ceil((float) this->_sample_rate * value); }
[[nodiscard]] inline float margin_release_time() { return (float) this->margin_samples_ / (float) this->sample_rate_; }
inline void set_margin_release_time(float value) { this->margin_samples_ = (size_t) ceil((float) this->sample_rate_ * value); }
[[nodiscard]] inline float attack_smooth() const { return this->_attack_smooth; }
inline void attack_smooth(float value) { this->_attack_smooth = value; }
[[nodiscard]] inline float attack_smooth() const { return this->attack_smooth_; }
inline void attack_smooth(float value) { this->attack_smooth_ = value; }
[[nodiscard]] inline float release_smooth() const { return this->_release_smooth; }
inline void release_smooth(float value) { this->_release_smooth = value; }
[[nodiscard]] inline float release_smooth() const { return this->release_smooth_; }
inline void release_smooth(float value) { this->release_smooth_ = value; }
std::function<void(float)> on_analyze;
private:
float _attack_smooth = 0;
float _release_smooth = 0;
float _current_level = 0;
float attack_smooth_{0};
float release_smooth_{0};
float current_level_{0};
float _threshold{};
float threshold_{0};
size_t _margin_samples = 0;
size_t _margin_processed_samples = 0;
size_t margin_samples_{0};
size_t margin_processed_samples_{0};
};
}

View File

@ -1,107 +1,134 @@
#include "FilterVad.h"
#include "./FilterVad.h"
#include "../AudioMerger.h"
#include "../AudioInput.h"
#ifdef USE_FVAD
#include "../../logger.h"
#include <fvad.h>
#endif
using namespace std;
using namespace tc::audio;
using namespace tc::audio::filter;
VadFilter::VadFilter(size_t channels, size_t rate, size_t frames) : Filter(channels, rate, frames) { }
VadFilter::VadFilter(size_t channels, size_t rate) : Filter{channels, rate} { }
#ifdef USE_FVAD
VadFilter::~VadFilter() {
this->cleanup_buffer();
if(this->_vad_handle) {
fvad_free(this->_vad_handle);
this->_vad_handle = nullptr;
if(this->vad_handle_) {
fvad_free(this->vad_handle_);
this->vad_handle_ = nullptr;
}
}
void VadFilter::cleanup_buffer() {
lock_guard lock(this->_buffer_lock);
if(this->_buffer)
free(this->_buffer);
this->_buffer = nullptr;
this->_buffer_size = 0;
}
bool VadFilter::initialize(std::string &error, size_t mode, size_t margin) {
this->_vad_handle = fvad_new();
if(!this->_vad_handle) {
this->vad_handle_ = fvad_new();
if(!this->vad_handle_) {
error = "failed to allocate handle";
return false;
}
if(fvad_set_sample_rate(this->_vad_handle, (int) this->_sample_rate) != 0) {
if(fvad_set_sample_rate(this->vad_handle_, (int) this->sample_rate_) != 0) {
error = "invalid sample rate. Sample rate must be one of [8000, 16000, 32000 and 48000]";
return false;
}
if(fvad_set_mode(this->_vad_handle, (int) mode) != 0) {
if(fvad_set_mode(this->vad_handle_, (int) mode) != 0) {
error = "failed to set mode";
return false;
}
this->_mode = mode;
this->_margin_samples = margin;
if(this->_channels > 1) {
this->ensure_buffer(this->_frame_size * this->_channels * 4); /* buffer to merge the channels into one channel */
} else {
this->ensure_buffer(this->_frame_size * 2);
}
this->mode_ = mode;
this->margin_samples_ = margin;
return true;
}
bool VadFilter::process(const void *buffer) {
if(!this->_vad_handle) {
std::optional<size_t> VadFilter::mode() const {
return std::make_optional(this->mode_);
}
constexpr static auto kMaxStackBufferSamples{1024 * 8};
bool VadFilter::contains_voice(const AudioInputBufferInfo& info, const float* buffer_) {
if(!this->vad_handle_) {
log_warn(category::audio, tr("Vad filter hasn't been initialized!"));
return false;
}
lock_guard lock(this->_buffer_lock);
if(this->_channels > 1) {
if(!merge::merge_channels_interleaved(this->_buffer, 1, buffer, this->_channels, this->_frame_size)) {
float temp_sample_buffer[kMaxStackBufferSamples];
if(info.sample_count > kMaxStackBufferSamples) {
log_warn(category::audio, tr("Vad filter received too many samples {}, expected max {}."), info.sample_count, kMaxStackBufferSamples);
return false;
}
if(this->channels_ > 1) {
if(!merge::merge_channels_interleaved(temp_sample_buffer, 1, buffer_, this->channels_, info.sample_count)) {
log_warn(category::audio, tr("Failed to merge channels"));
return false;
}
buffer = this->_buffer;
buffer_ = temp_sample_buffer;
}
/* convert float32 samples to signed int16 */
{
auto target = (int16_t*) this->_buffer;
auto source = (float*) buffer;
auto sample = this->_frame_size;
auto target = (int16_t*) temp_sample_buffer;
auto source = buffer_;
auto sample = info.sample_count;
float tmp;
while(sample-- > 0) {
tmp = *source++;
tmp *= 32768;
if(tmp > 32767)
tmp = 32767;
if(tmp > 32767) {
tmp = 32767;
}
if(tmp < -32768)
tmp = -32768;
if(tmp < -32768) {
tmp = -32768;
}
*target++ = (int16_t) tmp;
}
}
auto result = fvad_process(this->_vad_handle, (int16_t*) this->_buffer, this->_frame_size);
auto result = fvad_process(this->vad_handle_, (int16_t*) temp_sample_buffer, info.sample_count);
if(result == -1) {
log_warn(category::audio, tr("Invalid frame length"));
return false;
}
auto flag_vad = result == 1;
return result == 1;
}
#else
VadFilter::~VadFilter() = default;
if(!flag_vad) {
this->_margin_processed_samples += this->_frame_size;
return this->_margin_processed_samples <= this->_margin_samples;
} else {
this->_margin_processed_samples = 0;
}
return flag_vad;
std::optional<size_t> VadFilter::mode() const {
(void) this;
return std::nullopt;
}
bool VadFilter::initialize(std::string &error, size_t mode, size_t margin) {
this->mode_ = mode;
this->margin_samples_ = margin;
return true;
}
bool VadFilter::contains_voice(const AudioInputBufferInfo &info, const float *) {
(void) this;
return info.vad_detected.value_or(true);
}
#endif
bool VadFilter::process(const AudioInputBufferInfo &info, const float *buffer) {
auto flag_vad{this->contains_voice(info, buffer)};
if(!flag_vad) {
this->margin_processed_samples_ += info.sample_count;
return this->margin_processed_samples_ <= this->margin_samples_;
} else {
this->margin_processed_samples_ = 0;
}
return flag_vad;
}

View File

@ -1,43 +1,36 @@
#pragma once
#include "Filter.h"
#include "./Filter.h"
#include <cmath>
#include <mutex>
#include <fvad.h>
#include <optional>
#ifdef USE_FVAD
struct Fvad;
#endif
namespace tc::audio::filter {
class VadFilter : public Filter {
public:
VadFilter(size_t /* channel count */, size_t /* sample rate */, size_t /* frame size */);
VadFilter(size_t /* channel count */, size_t /* sample rate */);
virtual ~VadFilter();
bool initialize(std::string& /* error */, size_t /* mode */, size_t /* margin frames */);
bool process(const void* /* buffer */) override;
bool process(const AudioInputBufferInfo& /* info */, const float* /* buffer */) override;
inline float margin_release_time() { return (float) this->_margin_samples / (float) this->_sample_rate; }
inline void set_margin_release_time(float value) { this->_margin_samples = (size_t) ceil((float) this->_sample_rate * value); }
inline float margin_release_time() { return (float) this->margin_samples_ / (float) this->sample_rate_; }
inline void set_margin_release_time(float value) { this->margin_samples_ = (size_t) ceil((float) this->sample_rate_ * value); }
inline size_t mode() { return this->_mode; }
[[nodiscard]] std::optional<size_t> mode() const;
private:
Fvad* _vad_handle = nullptr;
size_t mode_{0};
size_t margin_samples_{0};
size_t margin_processed_samples_{0};
size_t _mode = 0;
size_t _margin_samples = 0;
size_t _margin_processed_samples = 0;
#ifdef USE_FVAD
Fvad* vad_handle_{nullptr};
#endif
std::mutex _buffer_lock;
void* _buffer = nullptr;
size_t _buffer_size = 0;
void cleanup_buffer();
inline void ensure_buffer(size_t length) {
if(this->_buffer_size < length) {
if(this->_buffer)
free(this->_buffer);
this->_buffer_size = length;
this->_buffer = malloc(this->_buffer_size);
}
}
bool contains_voice(const AudioInputBufferInfo& /* info */, const float* /* buffer */);
};
}

View File

@ -1,13 +1,11 @@
#include "./AudioConsumer.h"
#include "./AudioRecorder.h"
#include "./AudioFilter.h"
#include "../AudioInput.h"
#include "../filter/Filter.h"
#include "../filter/FilterVad.h"
#include "../filter/FilterThreshold.h"
#include "../filter/FilterState.h"
#include "../../logger.h"
#include <rnnoise.h> /* Must be last */
using namespace std;
using namespace tc::audio;
@ -41,42 +39,33 @@ NAN_METHOD(AudioConsumerWrapper::NewInstance) {
Nan::ThrowError("invalid invoke!");
}
AudioConsumerWrapper::AudioConsumerWrapper(AudioRecorderWrapper* h, const std::shared_ptr<tc::audio::AudioConsumer> &handle) : _handle(handle), _recorder(h) {
log_allocate("AudioConsumerWrapper", this);
{
lock_guard read_lock{handle->on_read_lock};
handle->on_read = [&](const void* buffer, size_t length){ this->process_data(buffer, length); };
}
AudioConsumerWrapper::AudioConsumerWrapper(const std::shared_ptr<AudioInput> &input) :
sample_rate_{input->sample_rate()},
channel_count_{input->channel_count()},
js_queue{}
{
log_allocate("AudioConsumerWrapper", this);
#ifdef DO_DEADLOCK_REF
this->_recorder->js_ref(); /* FML Mem leak! (In general the consumer live is related to the recorder handle, but for nodejs testing we want to keep this reference ) */
#endif
this->consumer_handle = std::make_shared<InputConsumer>();
this->consumer_handle->wrapper = this;
input->register_consumer(this->consumer_handle);
}
AudioConsumerWrapper::~AudioConsumerWrapper() {
log_free("AudioConsumerWrapper", this);
lock_guard lock{this->execute_mutex};
this->unbind();
for(auto index{0}; index < kInternalFrameBufferCount; index++) {
if(!this->internal_frame_buffer[index]) { continue; }
free(this->internal_frame_buffer[index]);
this->internal_frame_buffer[index] = nullptr;
this->internal_frame_buffer_size[index] = 0;
}
#ifdef DO_DEADLOCK_REF
if(this->_recorder) {
this->_recorder->js_unref();
}
#endif
{
std::lock_guard lock{this->consumer_handle->wrapper_mutex};
this->consumer_handle->wrapper = nullptr;
}
this->consumer_handle = nullptr;
}
void AudioConsumerWrapper::do_wrap(const v8::Local<v8::Object> &obj) {
this->Wrap(obj);
#if 0
this->_call_data = Nan::async_callback([&] {
Nan::HandleScope scope;
@ -99,7 +88,7 @@ void AudioConsumerWrapper::do_wrap(const v8::Local<v8::Object> &obj) {
this->_data_entries.pop_front();
}
const auto byte_length = buffer->sample_count * this->_handle->channel_count * 4;
const auto byte_length = buffer->sample_count * this->channel_count_ * 4;
auto js_buffer = v8::ArrayBuffer::New(Nan::GetCurrentContext()->GetIsolate(), byte_length);
auto js_fbuffer = v8::Float32Array::New(js_buffer, 0, byte_length / 4);
@ -110,6 +99,7 @@ void AudioConsumerWrapper::do_wrap(const v8::Local<v8::Object> &obj) {
(void) callback_function.As<v8::Function>()->Call(Nan::GetCurrentContext(), Nan::Undefined(), 1, argv);
}
});
#endif
this->_call_ended = Nan::async_callback([&]{
Nan::HandleScope scope;
@ -131,92 +121,17 @@ void AudioConsumerWrapper::do_wrap(const v8::Local<v8::Object> &obj) {
(void) callback_function.As<v8::Function>()->Call(Nan::GetCurrentContext(), Nan::Undefined(), 0, nullptr);
});
Nan::DefineOwnProperty(this->handle(), Nan::New<v8::String>("frameSize").ToLocalChecked(), Nan::New<v8::Number>((uint32_t) this->_handle->frame_size), v8::ReadOnly | v8::DontDelete);
Nan::DefineOwnProperty(this->handle(), Nan::New<v8::String>("sampleRate").ToLocalChecked(), Nan::New<v8::Number>((uint32_t) this->_handle->sample_rate), v8::ReadOnly | v8::DontDelete);
Nan::DefineOwnProperty(this->handle(), Nan::New<v8::String>("channelCount").ToLocalChecked(), Nan::New<v8::Number>((uint32_t) this->_handle->channel_count), v8::ReadOnly | v8::DontDelete);
Nan::DefineOwnProperty(this->handle(), Nan::New<v8::String>("sampleRate").ToLocalChecked(), Nan::New<v8::Number>((uint32_t) this->sample_rate_), v8::ReadOnly | v8::DontDelete);
Nan::DefineOwnProperty(this->handle(), Nan::New<v8::String>("channelCount").ToLocalChecked(), Nan::New<v8::Number>((uint32_t) this->channel_count_), v8::ReadOnly | v8::DontDelete);
}
void AudioConsumerWrapper::unbind() {
if(this->_handle) {
lock_guard lock{this->_handle->on_read_lock};
this->_handle->on_read = nullptr;
}
}
void AudioConsumerWrapper::process_data(const void *buffer, size_t samples) {
if(samples != 960) {
logger::error(logger::category::audio, tr("Received audio frame with invalid sample count (Expected 960, Received {})"), samples);
return;
void AudioConsumerWrapper::delete_consumer() {
if(this->consumer_handle) {
std::lock_guard lock{this->consumer_handle->wrapper_mutex};
this->consumer_handle->wrapper = nullptr;
}
lock_guard lock{this->execute_mutex};
if(this->filter_mode_ == FilterMode::BLOCK) { return; }
bool should_process{true};
if(this->filter_mode_ == FilterMode::FILTER) {
auto filters = this->filters();
for(const auto& filter : filters) {
auto _filter = filter->filter();
if(!_filter) continue;
if(_filter->frame_size() != samples) {
cerr << "Tried to use a filter, but frame size does not match!" << endl;
continue;
}
if(!_filter->process(buffer)) {
should_process = false;
break;
}
}
} else if(this->filter_mode_ != FilterMode::BYPASS) {
should_process = false;
}
if(!should_process) {
if(!this->last_consumed) {
this->last_consumed = true;
this->_call_ended();
unique_lock native_read_lock(this->native_read_callback_lock);
if(this->native_read_callback) {
auto callback = this->native_read_callback; /* copy */
native_read_lock.unlock();
callback(nullptr, 0); /* notify end */
}
}
return;
}
if(this->last_consumed) {
this->_call_started();
}
this->last_consumed = false;
{
unique_lock native_read_lock(this->native_read_callback_lock);
if(this->native_read_callback) {
auto callback = this->native_read_callback; /* copy */
native_read_lock.unlock();
callback(buffer, samples);
return;
}
}
auto byte_length = samples * this->_handle->channel_count * 4;
auto buf = make_unique<DataEntry>();
buf->buffer = malloc(byte_length);
memcpy(buf->buffer, buffer, byte_length);
buf->sample_count = samples;
{
lock_guard data_lock{this->_data_lock};
this->_data_entries.push_back(move(buf));
}
this->_call_data();
}
std::shared_ptr<AudioFilterWrapper> AudioConsumerWrapper::create_filter(const std::string& name, const std::shared_ptr<filter::Filter> &impl) {
auto result = shared_ptr<AudioFilterWrapper>(new AudioFilterWrapper(name, impl), [](AudioFilterWrapper* ptr) {
assert(v8::Isolate::GetCurrent());
@ -239,15 +154,17 @@ std::shared_ptr<AudioFilterWrapper> AudioConsumerWrapper::create_filter(const st
}
void AudioConsumerWrapper::delete_filter(const AudioFilterWrapper* filter) {
shared_ptr<AudioFilterWrapper> handle; /* need to keep the handle 'till everything has been finished */
std::shared_ptr<AudioFilterWrapper> handle; /* need to keep the handle 'till everything has been finished */
{
lock_guard lock(this->filter_mutex_);
std::lock_guard lock(this->filter_mutex_);
for(auto& c : this->filter_) {
if(&*c == filter) {
handle = c;
break;
}
}
if(!handle) {
return;
}
@ -261,20 +178,12 @@ void AudioConsumerWrapper::delete_filter(const AudioFilterWrapper* filter) {
}
{
lock_guard lock(this->execute_mutex); /* ensure that the filter isn't used right now */
/* ensure that the filter isn't used right now */
lock_guard lock{this->consumer_handle->wrapper_mutex};
handle->_filter = nullptr;
}
}
void AudioConsumerWrapper::reserve_internal_buffer(int index, size_t target) {
assert(index < kInternalFrameBufferCount);
if(this->internal_frame_buffer_size[index] < target) {
if(this->internal_frame_buffer_size[index]) { ::free(this->internal_frame_buffer[index]); }
this->internal_frame_buffer[index] = malloc(target);
this->internal_frame_buffer_size[index] = target;
}
}
NAN_METHOD(AudioConsumerWrapper::_get_filters) {
auto handle = ObjectWrap::Unwrap<AudioConsumerWrapper>(info.Holder());
@ -308,8 +217,6 @@ NAN_METHOD(AudioConsumerWrapper::_unregister_filter) {
NAN_METHOD(AudioConsumerWrapper::_create_filter_vad) {
auto handle = ObjectWrap::Unwrap<AudioConsumerWrapper>(info.Holder());
auto consumer = handle->_handle;
assert(consumer); /* should never be null! */
if(info.Length() != 1 || !info[0]->IsNumber()) {
Nan::ThrowError("invalid argument");
@ -317,7 +224,7 @@ NAN_METHOD(AudioConsumerWrapper::_create_filter_vad) {
}
string error;
auto filter = make_shared<filter::VadFilter>(consumer->channel_count,consumer->sample_rate,consumer->frame_size);
auto filter = make_shared<filter::VadFilter>(handle->channel_count_, handle->sample_rate_);
if(!filter->initialize(error, info[0]->Int32Value(Nan::GetCurrentContext()).FromMaybe(0), 2)) {
Nan::ThrowError(Nan::New<v8::String>("failed to initialize filter (" + error + ")").ToLocalChecked());
return;
@ -329,20 +236,14 @@ NAN_METHOD(AudioConsumerWrapper::_create_filter_vad) {
NAN_METHOD(AudioConsumerWrapper::_create_filter_threshold) {
auto handle = ObjectWrap::Unwrap<AudioConsumerWrapper>(info.Holder());
auto consumer = handle->_handle;
assert(consumer); /* should never be null! */
if(info.Length() != 1 || !info[0]->IsNumber()) {
Nan::ThrowError("invalid argument");
return;
}
string error;
auto filter = make_shared<filter::ThresholdFilter>(consumer->channel_count,consumer->sample_rate,consumer->frame_size);
if(!filter->initialize(error, (float) info[0]->Int32Value(Nan::GetCurrentContext()).FromMaybe(0), 2)) {
Nan::ThrowError(Nan::New<v8::String>("failed to initialize filter (" + error + ")").ToLocalChecked());
return;
}
auto filter = make_shared<filter::ThresholdFilter>(handle->channel_count_, handle->sample_rate_);
filter->initialize((float) info[0]->Int32Value(Nan::GetCurrentContext()).FromMaybe(0), 2);
auto object = handle->create_filter("threshold", filter);
info.GetReturnValue().Set(object->handle());
@ -350,16 +251,8 @@ NAN_METHOD(AudioConsumerWrapper::_create_filter_threshold) {
NAN_METHOD(AudioConsumerWrapper::_create_filter_state) {
auto handle = ObjectWrap::Unwrap<AudioConsumerWrapper>(info.Holder());
auto consumer = handle->_handle;
assert(consumer); /* should never be null! */
string error;
auto filter = make_shared<filter::StateFilter>(consumer->channel_count, consumer->sample_rate, consumer->frame_size);
if(!filter->initialize(error)) {
Nan::ThrowError(Nan::New<v8::String>("failed to initialize filter (" + error + ")").ToLocalChecked());
return;
}
auto filter = std::make_shared<filter::StateFilter>(handle->channel_count_, handle->sample_rate_);
auto object = handle->create_filter("state", filter);
info.GetReturnValue().Set(object->handle());
}
@ -379,4 +272,75 @@ NAN_METHOD(AudioConsumerWrapper::_set_filter_mode) {
auto value = info[0].As<v8::Number>()->Int32Value(info.GetIsolate()->GetCurrentContext()).FromMaybe(0);
handle->filter_mode_ = (FilterMode) value;
}
void AudioConsumerWrapper::InputConsumer::handle_buffer(const AudioInputBufferInfo &info, const float *buffer) {
std::lock_guard lock{this->wrapper_mutex};
if(!this->wrapper) {
return;
}
this->wrapper->handle_buffer(info, buffer);
}
void AudioConsumerWrapper::handle_buffer(const AudioInputBufferInfo &info, const float *buffer) {
bool should_process;
switch(this->filter_mode_) {
case FilterMode::FILTER:
should_process = true;
for(const auto& filter : this->filters()) {
auto filter_instance = filter->filter();
if(!filter_instance) continue;
if(!filter_instance->process(info, buffer)) {
should_process = false;
break;
}
}
break;
case FilterMode::BYPASS:
should_process = true;
break;
case FilterMode::BLOCK:
default:
should_process = false;
return;
}
if(!should_process) {
if(!this->last_consumed) {
this->last_consumed = true;
this->_call_ended();
std::unique_lock native_read_lock(this->native_read_callback_lock);
if(this->native_read_callback) {
auto callback = this->native_read_callback; /* copy */
native_read_lock.unlock();
callback(nullptr, 0); /* notify end */
}
}
} else {
if(this->last_consumed) {
this->last_consumed = false;
this->_call_started();
}
{
unique_lock native_read_lock(this->native_read_callback_lock);
if(this->native_read_callback) {
auto callback = this->native_read_callback; /* copy */
native_read_lock.unlock();
callback(buffer, info.sample_count);
return;
}
}
/* TODO: Callback JavaScript if required */
}
}

View File

@ -4,6 +4,8 @@
#include <mutex>
#include <deque>
#include <include/NanEventCallback.h>
#include "../AudioInput.h"
#include "../AudioReframer.h"
namespace tc::audio {
class AudioInput;
@ -13,6 +15,7 @@ namespace tc::audio {
class Filter;
}
struct AudioInputBufferInfo;
namespace recorder {
class AudioFilterWrapper;
class AudioRecorderWrapper;
@ -23,6 +26,7 @@ namespace tc::audio {
BLOCK
};
/* FIXME: Rechunk audio data to 20ms with the input frame rate (that's what we need for networking stuff) */
class AudioConsumerWrapper : public Nan::ObjectWrap {
friend class AudioRecorderWrapper;
public:
@ -38,7 +42,7 @@ namespace tc::audio {
return my_constructor_template;
}
AudioConsumerWrapper(AudioRecorderWrapper*, const std::shared_ptr<AudioConsumer>& /* handle */);
AudioConsumerWrapper(const std::shared_ptr<AudioInput>& /* input */);
~AudioConsumerWrapper() override;
static NAN_METHOD(_get_filters);
@ -59,47 +63,38 @@ namespace tc::audio {
return this->filter_;
}
[[nodiscard]] inline auto channel_count() const { return this->channel_count_; };
[[nodiscard]] inline auto sample_rate() const { return this->sample_rate_; };
[[nodiscard]] inline FilterMode filter_mode() const { return this->filter_mode_; }
inline std::shared_ptr<AudioConsumer> native_consumer() { return this->_handle; }
std::mutex native_read_callback_lock;
std::function<void(const void * /* buffer */, size_t /* samples */)> native_read_callback;
std::function<void(const float * /* buffer */, size_t /* samples */)> native_read_callback;
private:
AudioRecorderWrapper* _recorder;
struct InputConsumer : public AudioInputConsumer {
std::mutex wrapper_mutex{};
AudioConsumerWrapper* wrapper{nullptr};
std::mutex execute_mutex;
std::shared_ptr<AudioConsumer> _handle;
void handle_buffer(const AudioInputBufferInfo &, const float *) override;
};
std::shared_ptr<InputConsumer> consumer_handle{};
size_t const channel_count_;
size_t const sample_rate_;
Nan::JavaScriptQueue js_queue;
std::mutex filter_mutex_;
std::deque<std::shared_ptr<AudioFilterWrapper>> filter_;
FilterMode filter_mode_{FilterMode::FILTER};
bool last_consumed = false;
constexpr static auto kInternalFrameBufferCount{2};
void* internal_frame_buffer[kInternalFrameBufferCount]{nullptr};
size_t internal_frame_buffer_size[kInternalFrameBufferCount]{0};
void do_wrap(const v8::Local<v8::Object>& /* object */);
void delete_consumer();
void unbind(); /* called with execute_lock locked */
void process_data(const void* /* buffer */, size_t /* samples */);
void handle_buffer(const AudioInputBufferInfo& /* info */, const float* /* buffer */);
void reserve_internal_buffer(int /* buffer */, size_t /* bytes */);
struct DataEntry {
void* buffer = nullptr;
size_t sample_count = 0;
~DataEntry() {
if(buffer)
free(buffer);
}
};
std::mutex _data_lock;
std::deque<std::unique_ptr<DataEntry>> _data_entries;
Nan::callback_t<> _call_data;
Nan::callback_t<> _call_ended;
Nan::callback_t<> _call_started;
};

View File

@ -1,4 +1,6 @@
#include "AudioFilter.h"
#include <utility>
#include "../filter/FilterVad.h"
#include "../filter/FilterThreshold.h"
#include "../filter/FilterState.h"
@ -44,7 +46,7 @@ NAN_METHOD(AudioFilterWrapper::NewInstance) {
Nan::ThrowError("invalid invoke!");
}
AudioFilterWrapper::AudioFilterWrapper(const std::string& name, const std::shared_ptr<tc::audio::filter::Filter> &filter) : _filter(filter), _name(name) {
AudioFilterWrapper::AudioFilterWrapper(std::string name, std::shared_ptr<tc::audio::filter::Filter> filter) : _filter{std::move(filter)}, _name{std::move(name)} {
auto threshold_filter = dynamic_pointer_cast<filter::ThresholdFilter>(this->_filter);
if(threshold_filter) {
this->_call_analyzed = Nan::async_callback([&](float value) {
@ -56,7 +58,7 @@ AudioFilterWrapper::AudioFilterWrapper(const std::string& name, const std::share
v8::Local<v8::Value> argv[1];
argv[0] = Nan::New<v8::Number>(value);
cb->Call(Nan::GetCurrentContext(), Nan::Undefined(), 1, argv);
(void) cb->Call(Nan::GetCurrentContext(), Nan::Undefined(), 1, argv);
}
});
}
@ -99,7 +101,12 @@ NAN_METHOD(AudioFilterWrapper::_get_level) {
return;
}
info.GetReturnValue().Set((int) filter->mode());
auto mode = filter->mode();
if(mode.has_value()) {
info.GetReturnValue().Set((int) *mode);
} else {
/* We're using the preprocessor config */
}
}

View File

@ -25,7 +25,7 @@ namespace tc::audio {
return my_constructor_template;
}
AudioFilterWrapper(const std::string& name, const std::shared_ptr<filter::Filter>& /* handle */);
AudioFilterWrapper(std::string name, std::shared_ptr<filter::Filter> /* handle */);
~AudioFilterWrapper() override;
static NAN_METHOD(_get_name);

View File

@ -77,7 +77,7 @@ AudioRecorderWrapper::~AudioRecorderWrapper() {
}
std::shared_ptr<AudioConsumerWrapper> AudioRecorderWrapper::create_consumer() {
auto result = shared_ptr<AudioConsumerWrapper>(new AudioConsumerWrapper(this, this->input_->create_consumer(960)), [](AudioConsumerWrapper* ptr) {
auto result = std::shared_ptr<AudioConsumerWrapper>(new AudioConsumerWrapper(this->input_), [](AudioConsumerWrapper* ptr) {
assert(v8::Isolate::GetCurrent());
ptr->Unref();
});
@ -120,10 +120,7 @@ void AudioRecorderWrapper::delete_consumer(const AudioConsumerWrapper* consumer)
}
}
{
lock_guard lock(handle->execute_mutex); /* if we delete the consumer while executing strange stuff could happen */
handle->unbind();
}
handle->delete_consumer();
}
void AudioRecorderWrapper::do_wrap(const v8::Local<v8::Object> &obj) {

View File

@ -581,7 +581,9 @@ NAN_METHOD(ServerConnection::send_voice_data_raw) {
auto voice_data = info[0].As<v8::Float32Array>()->Buffer();
auto vs = this->voice_connection ? this->voice_connection->voice_sender() : nullptr;
if(vs) vs->send_data(voice_data->GetContents().Data(), voice_data->GetContents().ByteLength() / (4 * channels), sample_rate, channels);
if(vs) {
vs->send_data((float*) voice_data->GetContents().Data(), voice_data->GetContents().ByteLength() / (4 * channels), sample_rate, channels);
}
}
#ifdef SHUFFLE_VOICE

View File

@ -1,9 +1,10 @@
#include "AudioSender.h"
#include "VoiceConnection.h"
#include "../ServerConnection.h"
#include "../../logger.h"
#include "../../audio/AudioEventLoop.h"
#include "../../audio/AudioMerger.h"
#include "../../audio/AudioReframer.h"
#include "../../audio/codec/OpusConverter.h"
using namespace std;
using namespace tc;
@ -11,129 +12,110 @@ using namespace tc::audio;
using namespace tc::audio::codec;
using namespace tc::connection;
VoiceSender::VoiceSender(tc::connection::VoiceConnection *handle) : handle(handle) {}
VoiceSender::VoiceSender(tc::connection::VoiceConnection *handle) : handle{handle} {}
VoiceSender::~VoiceSender() {
/* Note: We can't be within the event loop since if we were we would have a shared reference*/
audio::encode_event_loop->cancel(dynamic_pointer_cast<event::EventEntry>(this->_ref.lock()));
this->clear_buffer(); /* buffer might be accessed within encode_raw_frame, but this could not be trigered while this will be deallocated! */
}
bool VoiceSender::initialize_codec(std::string& error, connection::codec::value codec, size_t channels, size_t rate, bool reset_encoder) {
auto& data = this->codec[codec];
bool new_allocated = !data;
if(new_allocated) {
data = make_unique<AudioCodec>();
data->packet_counter = 0;
data->last_packet = chrono::system_clock::now();
}
auto info = codec::get_info(codec);
if(!info || !info->supported) {
if(new_allocated)
log_error(category::voice_connection, tr("Tried to send voice packet but we dont support the current codec ({})"), codec);
return false;
}
if(!data->converter) {
data->converter = info->new_converter(error);
if(!data->converter)
return false;
} else if(reset_encoder) {
data->converter->reset_encoder();
}
if(!data->resampler || data->resampler->input_rate() != rate) {
data->resampler = make_shared<AudioResampler>(rate, data->converter->sample_rate(), data->converter->channels());
}
if(!data->resampler->valid()) {
error = "resampler is invalid";
return false;
}
return true;
{
lock_guard buffer_lock{this->raw_audio_buffer_mutex};
while(this->raw_audio_buffers_head) {
auto buffer = std::exchange(this->raw_audio_buffers_head, this->raw_audio_buffers_head->next);
buffer->~AudioFrame();
::free(this->raw_audio_buffers_head);
}
this->raw_audio_buffers_tail = &this->raw_audio_buffers_head;
}
}
void VoiceSender::set_voice_send_enabled(bool flag) {
this->voice_send_enabled = flag;
}
void VoiceSender::send_data(const void *data, size_t samples, size_t rate, size_t channels) {
unique_lock lock{this->_execute_lock};
if(!this->handle) {
log_warn(category::voice_connection, tr("Dropping raw audio frame because of an invalid handle."));
return;
}
lock.unlock();
void VoiceSender::send_data(const float *data, size_t samples, size_t rate, size_t channels) {
if(!this->voice_send_enabled) {
log_warn(category::voice_connection, tr("Dropping raw audio frame because voice sending has been disabled!"));
return;
}
auto frame = make_unique<AudioFrame>();
/* aligned for float values */
const auto aligned_frame_size{((sizeof(AudioFrame) + 3) / sizeof(float)) * sizeof(float)};
auto frame = (AudioFrame*) malloc(aligned_frame_size + samples * channels * sizeof(float));
new (&frame) AudioFrame{};
frame->sample_count = samples;
frame->sample_rate = rate;
frame->channels = channels;
frame->buffer = pipes::buffer{(void*) data, samples * channels * 4};
frame->buffer = (float*) frame + aligned_frame_size / sizeof(float);
memcpy(frame->buffer, data, samples * channels * sizeof(float));
frame->timestamp = chrono::system_clock::now();
{
lock_guard buffer_lock(this->raw_audio_buffer_lock);
this->raw_audio_buffers.push_back(move(frame));
lock_guard buffer_lock(this->raw_audio_buffer_mutex);
*this->raw_audio_buffers_tail = frame;
this->raw_audio_buffers_tail = &frame->next;
}
audio::encode_event_loop->schedule(dynamic_pointer_cast<event::EventEntry>(this->_ref.lock()));
}
void VoiceSender::send_stop() {
unique_lock lock(this->_execute_lock);
if(!this->handle) {
log_warn(category::voice_connection, tr("Dropping audio end frame because of an invalid handle."));
return;
}
lock.unlock();
auto frame = make_unique<AudioFrame>();
frame->sample_rate = 0;
frame->channels = 0;
frame->buffer = pipes::buffer{nullptr, 0};
auto frame = (AudioFrame*) malloc(sizeof(AudioFrame));
new (&frame) AudioFrame{};
frame->timestamp = chrono::system_clock::now();
{
lock_guard buffer_lock(this->raw_audio_buffer_lock);
this->raw_audio_buffers.push_back(move(frame));
lock_guard buffer_lock{this->raw_audio_buffer_mutex};
*this->raw_audio_buffers_tail = frame;
this->raw_audio_buffers_tail = &frame->next;
}
audio::encode_event_loop->schedule(dynamic_pointer_cast<event::EventEntry>(this->_ref.lock()));
}
void VoiceSender::finalize() {
lock_guard lock(this->_execute_lock);
auto execute_lock = this->execute_lock(true);
this->handle = nullptr;
}
void VoiceSender::set_codec(connection::codec::value target_codec) {
this->target_codec_ = target_codec;
}
void VoiceSender::event_execute(const std::chrono::system_clock::time_point &point) {
static auto max_time = chrono::milliseconds(10);
bool reschedule = false;
auto now = chrono::system_clock::now();
while(true) {
unique_lock buffer_lock(this->raw_audio_buffer_lock);
if(this->raw_audio_buffers.empty())
break;
if(chrono::system_clock::now() - now > max_time) {
reschedule = true;
break;
std::unique_lock buffer_lock{this->raw_audio_buffer_mutex};
if(!this->raw_audio_buffers_head) {
break;
}
auto entry = move(this->raw_audio_buffers.front());
this->raw_audio_buffers.pop_front();
auto next_buffer = std::exchange(this->raw_audio_buffers_head, this->raw_audio_buffers_head->next);
if(!this->raw_audio_buffers_head) {
assert(this->raw_audio_buffers_tail == &next_buffer->next);
this->raw_audio_buffers_tail = &this->raw_audio_buffers_head;
}
buffer_lock.unlock();
//TODO: Drop too old buffers!
this->encode_raw_frame(entry);
//TODO: Drop too old buffers!
if(this->handle) {
this->encode_raw_frame(next_buffer);
}
next_buffer->~AudioFrame();
::free(next_buffer);
if(chrono::system_clock::now() - now > max_time) {
reschedule = true;
break;
}
}
if(reschedule) {
@ -142,94 +124,139 @@ void VoiceSender::event_execute(const std::chrono::system_clock::time_point &poi
}
}
void VoiceSender::encode_raw_frame(const std::unique_ptr<AudioFrame> &frame) {
auto codec = this->_current_codec;
auto& codec_data = this->codec[codec];
constexpr static auto kTempBufferMaxSampleCount{1024 * 8};
void VoiceSender::encode_raw_frame(const AudioFrame* frame) {
if(frame->sample_rate == 0) {
/* Audio sequence end */
this->audio_sequence_no = 0;
if(this->current_codec_.has_value()) {
this->flush_current_codec();
bool flag_head = true, flag_reset = true;
if(codec_data) {
if(codec_data->last_packet + chrono::seconds(1) < frame->timestamp)
codec_data->packet_counter = 0;
auto server = this->handle->handle();
server->send_voice_data(nullptr, 0, *this->current_codec_, false);
}
return;
}
flag_head = codec_data->packet_counter < 5;
flag_reset = codec_data->packet_counter == 0;
if(!this->current_codec_.has_value() || *this->current_codec_ != this->target_codec_) {
this->flush_current_codec();
codec_data->packet_counter++;
codec_data->last_packet = frame->timestamp;
}
this->audio_sequence_no = 0;
this->codec_resampler = nullptr;
this->codec_reframer = nullptr;
this->codec_encoder = nullptr;
this->current_codec_ = std::make_optional(this->target_codec_);
if(frame->channels == 0 || frame->sample_rate == 0 || frame->buffer.empty()) {
lock_guard lock(this->_execute_lock);
if(!this->handle) {
log_warn(category::voice_connection, tr("Dropping audio end because of an invalid handle."));
return;
}
std::string error{};
switch(this->target_codec_) {
case codec::OPUS_VOICE:
this->codec_encoder = std::make_unique<audio::codec::OpusAudioEncoder>(OPUS_APPLICATION_VOIP);
if(!this->codec_encoder->initialize(error)) {
log_error(category::voice_connection, tr("Failed to initialize target audio codec {}"), (uint32_t) this->target_codec_);
this->codec_encoder = nullptr;
return;
}
break;
if(codec_data) {
codec_data->packet_counter = 0;
if(auto converter = codec_data->converter; converter) {
log_trace(category::voice_connection, tr("Resetting encoder"));
converter->reset_encoder();
}
}
auto server = this->handle->handle();
server->send_voice_data(this->_buffer, 0, codec, flag_head);
return;
}
case codec::OPUS_MUSIC:
this->codec_encoder = std::make_unique<audio::codec::OpusAudioEncoder>(OPUS_APPLICATION_AUDIO);
if(!this->codec_encoder->initialize(error)) {
log_error(category::voice_connection, tr("Failed to initialize target audio codec {}"), (uint32_t) this->target_codec_);
this->codec_encoder = nullptr;
return;
}
break;
string error;
if(flag_reset) {
log_trace(category::voice_connection, tr("Resetting encoder for voice sender"));
}
if(!this->initialize_codec(error, codec, frame->channels, frame->sample_rate, flag_reset)) {
log_error(category::voice_connection, tr("Failed to initialize codec: {}"), error);
return;
}
default:
log_error(category::voice_connection, tr("Unknown target encode codec {}"), (uint32_t) this->target_codec_);
return;
}
}
auto merged_channel_byte_size = codec_data->converter->channels() * (frame->buffer.length() / frame->channels);
auto estimated_resampled_byte_size = codec_data->resampler->estimated_output_size(merged_channel_byte_size);
this->ensure_buffer(max(estimated_resampled_byte_size, merged_channel_byte_size));
if(!this->codec_encoder) {
/* Codec failed to initialize */
return;
}
auto codec_channels = codec_data->converter->channels();
if(!audio::merge::merge_channels_interleaved(this->_buffer, codec_channels, frame->buffer.data_ptr(), frame->channels, frame->buffer.length() / frame->channels / 4)) {
log_warn(category::voice_connection, tr("Failed to merge channels to output stream channel count! Dropping local voice packet"));
return;
}
const auto codec_channel_count = this->codec_encoder->channel_count();
const auto codec_sample_rate = this->codec_encoder->sample_rate();
auto resampled_samples{estimated_resampled_byte_size / frame->channels / sizeof(float)};
if(!codec_data->resampler->process(this->_buffer, this->_buffer, merged_channel_byte_size / codec_channels / 4, resampled_samples)) {
log_error(category::voice_connection, tr("Failed to resample buffer."), resampled_samples);
return;
}
float temp_buffer[kTempBufferMaxSampleCount];
size_t current_sample_count{frame->sample_count};
float* current_sample_buffer;
if(!resampled_samples) {
/* we don't have any data */
return;
}
if(frame->channels != codec_channel_count) {
assert(kTempBufferMaxSampleCount >= frame->sample_count * codec_channel_count);
if(!audio::merge::merge_channels_interleaved(temp_buffer, codec_channel_count, frame->buffer, frame->channels, frame->sample_count)) {
log_warn(category::voice_connection, tr("Failed to merge channels to output stream channel count! Dropping local voice packet"));
return;
}
if(resampled_samples * codec_channels * 4 != codec_data->converter->bytes_per_frame()) {
log_error(category::voice_connection,
tr("Could not encode audio frame. Frame length is not equal to code frame length! Codec: {}, Packet: {}"),
codec_data->converter->bytes_per_frame(), resampled_samples * codec_channels * 4
);
return;
}
current_sample_buffer = temp_buffer;
} else {
current_sample_buffer = frame->buffer;
}
char _packet_buffer[512];
auto encoded_bytes = codec_data->converter->encode(error, this->_buffer, _packet_buffer, 512, flag_head);
if(encoded_bytes <= 0) {
log_error(category::voice_connection, tr("Failed to encode voice: {}"), error);
return;
}
if(frame->sample_rate != codec_sample_rate) {
if(!this->codec_resampler || this->codec_resampler->input_rate() != frame->sample_rate) {
this->codec_resampler = std::make_unique<audio::AudioResampler>(frame->sample_rate, codec_sample_rate, codec_channel_count);
}
{
lock_guard lock(this->_execute_lock);
if(!this->handle) {
log_warn(category::voice_connection, tr("Dropping audio frame because of an invalid handle."));
return;
}
size_t resampled_sample_count{this->codec_resampler->estimated_output_size(frame->sample_count)};
assert(kTempBufferMaxSampleCount >= resampled_sample_count * codec_channel_count);
if(!this->codec_resampler->process(temp_buffer, current_sample_buffer, frame->sample_count, resampled_sample_count)) {
log_error(category::voice_connection, tr("Failed to resample buffer. Dropping audio frame"));
return;
}
auto server = this->handle->handle();
server->send_voice_data(_packet_buffer, encoded_bytes, codec, flag_head);
}
current_sample_buffer = temp_buffer;
current_sample_count = resampled_sample_count;
}
if(!this->codec_reframer) {
this->codec_reframer = std::make_unique<audio::AudioReframer>(codec_channel_count, (size_t) (0.02 * codec_sample_rate));
this->codec_reframer->on_frame = [&](const float* sample_buffer) {
assert(this->codec_reframer);
this->handle_network_frame(sample_buffer, this->codec_reframer->target_size(), false);
};
this->codec_reframer->on_flush = [&](const float* sample_buffer, size_t sample_count) {
this->handle_network_frame(sample_buffer, sample_count, true);
};
}
this->codec_reframer->process(current_sample_buffer, current_sample_count);
}
constexpr static auto kMaxPacketSize{1500};
void VoiceSender::handle_network_frame(const float *sample_buffer, size_t sample_count, bool is_flush) {
assert(this->current_codec_.has_value());
assert(this->codec_encoder);
//log_trace(category::voice_connection, tr("Encoding audio chunk of {}/{} aka {}ms with codec {}"),
// sample_count, this->codec_encoder->sample_rate(), sample_count * 1000 / this->codec_encoder->sample_rate(), *this->current_codec_);
char packet_buffer[kMaxPacketSize];
size_t packet_size{kMaxPacketSize};
EncoderBufferInfo buffer_info{};
buffer_info.flush_encoder = is_flush;
buffer_info.sample_count = sample_count;
buffer_info.head_sequence = this->audio_sequence_no++ < 5;
std::string error{};
if(!this->codec_encoder->encode(error, packet_buffer, packet_size, buffer_info, sample_buffer)) {
log_error(category::voice_connection, tr("Failed to encode voice: {}"), error);
return;
}
auto server = this->handle->handle();
server->send_voice_data(packet_buffer, packet_size, *this->current_codec_, buffer_info.head_sequence);
}
void VoiceSender::flush_current_codec() {
if(!this->codec_reframer) {
return;
}
this->codec_reframer->flush();
}

View File

@ -2,14 +2,17 @@
#include <mutex>
#include <memory>
#include <optional>
#include "VoiceClient.h"
namespace tc {
namespace audio {
namespace codec {
class Converter;
class AudioEncoder;
}
class AudioReframer;
class AudioResampler;
class AudioOutputSource;
}
@ -24,66 +27,54 @@ namespace tc {
explicit VoiceSender(VoiceConnection*);
virtual ~VoiceSender();
codec::value get_codec() { return this->_current_codec; }
void set_codec(codec::value value) { this->_current_codec = value; }
void finalize();
codec::value get_codec() { return this->current_codec_.value_or(this->target_codec_); }
void set_codec(codec::value value);
void finalize();
void send_data(const void* /* buffer */, size_t /* samples */, size_t /* sample rate */, size_t /* channels */);
void send_data(const float* /* buffer */, size_t /* samples */, size_t /* sample rate */, size_t /* channels */);
void send_stop();
void set_voice_send_enabled(bool /* flag */);
private:
struct AudioFrame {
AudioFrame* next{nullptr};
float* buffer{nullptr};
size_t sample_count{0};
size_t sample_rate{0};
size_t channels{0};
std::chrono::system_clock::time_point timestamp{};
~AudioFrame() = default;
AudioFrame() = default;
};
std::weak_ptr<VoiceSender> _ref;
VoiceConnection* handle;
struct AudioCodec {
size_t packet_counter = 0;
std::chrono::system_clock::time_point last_packet;
std::mutex raw_audio_buffer_mutex{};
AudioFrame* raw_audio_buffers_head{nullptr};
AudioFrame** raw_audio_buffers_tail{&this->raw_audio_buffers_head};
std::shared_ptr<audio::codec::Converter> converter;
std::shared_ptr<audio::AudioResampler> resampler;
};
std::array<std::unique_ptr<AudioCodec>, codec::MAX + 1> codec{nullptr};
bool voice_send_enabled{false};
bool initialize_codec(std::string&, codec::value /* codec */, size_t /* channels */, size_t /* source sample rate */, bool /* reset decoder */);
/* Codec specific values */
codec::value target_codec_{codec::OPUS_VOICE};
std::optional<codec::value> current_codec_{};
codec::value _current_codec = codec::OPUS_VOICE;
std::unique_ptr<audio::codec::AudioEncoder> codec_encoder{};
std::unique_ptr<audio::AudioResampler> codec_resampler{};
std::unique_ptr<audio::AudioReframer> codec_reframer{};
std::mutex _execute_lock;
size_t audio_sequence_no{0};
void* _buffer = nullptr;
size_t _buffer_size = 0;
void clear_buffer() {
if(this->_buffer)
::free(this->_buffer);
this->_buffer = nullptr;
this->_buffer_size = 0;
}
/* Call these methods only when the execute lock has been accquired */
void encode_raw_frame(const AudioFrame*);
void handle_network_frame(const float* /* buffer */, size_t /* sample count */, bool /* flush */);
void flush_current_codec();
void ensure_buffer(size_t length) {
if(!this->_buffer || this->_buffer_size < length) {
if(this->_buffer)
::free(this->_buffer);
this->_buffer = malloc(length);
this->_buffer_size = length;
}
}
struct AudioFrame {
pipes::buffer buffer;
size_t sample_rate;
size_t channels;
std::chrono::system_clock::time_point timestamp;
};
std::mutex raw_audio_buffer_lock;
std::deque<std::unique_ptr<AudioFrame>> raw_audio_buffers;
bool voice_send_enabled = false;
void encode_raw_frame(const std::unique_ptr<AudioFrame>&);
void event_execute(const std::chrono::system_clock::time_point &point) override;
};
}

View File

@ -20,8 +20,8 @@ VoiceConnectionWrap::~VoiceConnectionWrap() {
auto old_consumer = this->_voice_recoder_ptr;
assert(old_consumer);
lock_guard read_lock(old_consumer->native_consumer()->on_read_lock);
old_consumer->native_consumer()->on_read = nullptr;
std::lock_guard read_lock{old_consumer->native_read_callback_lock};
old_consumer->native_read_callback = nullptr;
}
}
@ -176,14 +176,12 @@ NAN_METHOD(VoiceConnectionWrap::set_audio_source) {
connection->_voice_recoder_ptr = ObjectWrap::Unwrap<audio::recorder::AudioConsumerWrapper>(info[0]->ToObject(Nan::GetCurrentContext()).ToLocalChecked());
connection->_voice_recoder_handle.Reset(info[0]->ToObject(Nan::GetCurrentContext()).ToLocalChecked());
auto native_consumer = connection->_voice_recoder_ptr->native_consumer();
weak_ptr weak_handle = handle;
auto sample_rate = native_consumer->sample_rate;
auto channels = native_consumer->channel_count;
auto sample_rate = connection->_voice_recoder_ptr->sample_rate();
auto channels = connection->_voice_recoder_ptr->channel_count();
lock_guard read_lock{connection->_voice_recoder_ptr->native_read_callback_lock};
connection->_voice_recoder_ptr->native_read_callback = [weak_handle, sample_rate, channels](const void* buffer, size_t length) {
connection->_voice_recoder_ptr->native_read_callback = [weak_handle, sample_rate, channels](const float* buffer, size_t sample_count) {
auto handle = weak_handle.lock();
if(!handle) {
log_warn(category::audio, tr("Missing voice connection handle. Dropping input!"));
@ -192,8 +190,8 @@ NAN_METHOD(VoiceConnectionWrap::set_audio_source) {
auto sender = handle->voice_sender();
if(sender) {
if(length > 0 && buffer) {
sender->send_data(buffer, length, sample_rate, channels);
if(sample_count > 0 && buffer) {
sender->send_data(buffer, sample_count, sample_rate, channels);
} else {
sender->send_stop();
}