Removed duplicate codec definitions and registering Flac as codec id 6

This commit is contained in:
WolverinDEV 2021-03-29 14:00:38 +02:00
parent 82333000d5
commit 34141b78c0
12 changed files with 749 additions and 776 deletions

View File

@ -12,6 +12,9 @@ namespace tc::event {
class EventEntry {
friend class EventExecutor;
public:
EventEntry() = default;
virtual ~EventEntry() = default;
virtual void event_execute(const std::chrono::system_clock::time_point& /* scheduled timestamp */) = 0;
virtual void event_execute_dropped(const std::chrono::system_clock::time_point& /* scheduled timestamp */) {}

View File

@ -4,31 +4,78 @@
#include "Converter.h"
#include <memory>
#define HAVE_CODEC_OPUS
#ifdef HAVE_CODEC_OPUS
#include "./OpusConverter.h"
#endif
using namespace tc::audio;
using namespace tc::audio::codec;
Converter::Converter(size_t c, size_t s, size_t f) : _channels(c), _sample_rate(s), _frame_size(f) {}
Converter::~Converter() {}
[[nodiscard]] constexpr inline bool audio_codec_supported(const AudioCodec &codec) {
switch (codec) {
bool type::supported(value type) {
#ifdef HAVE_CODEC_OPUS
if(type == type::opus)
return true;
case AudioCodec::OpusVoice:
case AudioCodec::OpusMusic:
return true;
#endif
#ifdef HAVE_CODEC_SPEEX
if(type == type::speex)
return true;
case AudioCodec::Speex:
return true;
#endif
#ifdef HAVE_CODEC_FLAC
if(type == type::flac)
return true;
case AudioCodec::Flac:
return true;
#endif
#ifdef HAVE_CODEC_CELT
if(type == type::celt)
return true;
case AudioCodec::Celt:
return true;
#endif
return false;
default:
return false;
}
}
bool codec::audio_decode_supported(const AudioCodec &codec) {
return audio_codec_supported(codec);
}
bool codec::audio_encode_supported(const AudioCodec &codec) {
return audio_codec_supported(codec);
}
std::unique_ptr<AudioDecoder> codec::create_audio_decoder(const AudioCodec &codec) {
switch (codec) {
#ifdef HAVE_CODEC_OPUS
case AudioCodec::OpusVoice:
case AudioCodec::OpusMusic:
return std::make_unique<OpusAudioDecoder>(codec);
#endif
default:
return nullptr;
}
}
std::unique_ptr<AudioEncoder> codec::create_audio_encoder(const AudioCodec &codec) {
switch (codec) {
#ifdef HAVE_CODEC_OPUS
case AudioCodec::OpusVoice:
case AudioCodec::OpusMusic:
return std::make_unique<OpusAudioEncoder>(codec);
#endif
default:
return nullptr;
}
}

View File

@ -1,66 +1,122 @@
#pragma once
#include <string>
#if !defined(ssize_t) && defined(WIN32)
#define ssize_t int64_t
#endif
#include <memory>
#include <optional>
namespace tc::audio::codec {
namespace type {
enum value {
undefined,
enum struct AudioCodec {
Unknown,
/* supported */
opus,
speex,
/* supported */
OpusVoice,
OpusMusic,
/* unsupported */
flac,
celt
};
/* Not yet supported */
Flac,
extern bool supported(value);
/* Removed in summer 2020 */
SpeexNarrow,
SpeexWide,
SpeexUltraWide,
Celt,
};
class AudioEncoder;
class AudioDecoder;
[[nodiscard]] extern bool audio_encode_supported(const AudioCodec& /* codec */);
[[nodiscard]] extern std::unique_ptr<AudioEncoder> create_audio_encoder(const AudioCodec& /* codec */);
[[nodiscard]] extern bool audio_decode_supported(const AudioCodec& /* codec */);
[[nodiscard]] extern std::unique_ptr<AudioDecoder> create_audio_decoder(const AudioCodec& /* codec */);
[[nodiscard]] constexpr inline std::optional<uint8_t> audio_codec_to_protocol_id(const AudioCodec& codec) {
switch(codec) {
case AudioCodec::SpeexNarrow:
return std::make_optional(0);
case AudioCodec::SpeexWide:
return std::make_optional(1);
case AudioCodec::SpeexUltraWide:
return std::make_optional(2);
case AudioCodec::Celt:
return std::make_optional(3);
case AudioCodec::OpusVoice:
return std::make_optional(4);
case AudioCodec::OpusMusic:
return std::make_optional(5);
case AudioCodec::Flac:
return std::make_optional(6);
default:
return std::nullopt;
}
}
class Converter {
public:
Converter(size_t /* channels */, size_t /* sample rate */, size_t /* frame size */);
virtual ~Converter();
[[nodiscard]] constexpr inline std::optional<AudioCodec> audio_codec_from_protocol_id(uint8_t id) {
switch(id) {
case 0:
return std::make_optional(AudioCodec::SpeexNarrow);
/* initialize parameters depend on the codec */
virtual bool valid() = 0;
virtual void finalize() = 0;
case 1:
return std::make_optional(AudioCodec::SpeexWide);
virtual void reset_encoder() = 0;
virtual void reset_decoder() = 0;
case 2:
return std::make_optional(AudioCodec::SpeexUltraWide);
/**
* @return number of bytes written on success
*/
virtual ssize_t encode(std::string& /* error */, const void* /* source */, void* /* destination */, size_t /* destination byte length */, bool /* head package */) = 0;
case 3:
return std::make_optional(AudioCodec::Celt);
/**
* @return number of samples on success
*/
virtual ssize_t decode(std::string& /* error */, const void* /* source */, size_t /* source byte length */, void* /* destination */, bool /* fec decoding */) = 0;
virtual ssize_t decode_lost(std::string& /* error */, size_t /* packets */) = 0;
case 4:
return std::make_optional(AudioCodec::OpusVoice);
virtual size_t expected_encoded_length(size_t /* sample count */) = 0;
virtual size_t expected_decoded_length(const void* /* source */, size_t /* source byte length */) {
return this->bytes_per_frame();
}
case 5:
return std::make_optional(AudioCodec::OpusMusic);
inline size_t channels() { return this->_channels; }
inline size_t sample_rate() { return this->_sample_rate; }
inline size_t frame_size() { return this->_frame_size; }
case 6:
return std::make_optional(AudioCodec::Flac);
inline size_t bytes_per_frame() { return this->_channels * this->_frame_size * 4; }
protected:
size_t _frame_size;
size_t _channels;
size_t _sample_rate;
};
default:
return std::nullopt;
}
}
[[nodiscard]] constexpr inline const char* audio_codec_name(const AudioCodec& codec) {
switch(codec) {
case AudioCodec::SpeexNarrow:
return "speex narrow";
case AudioCodec::SpeexWide:
return "speex wide";
case AudioCodec::SpeexUltraWide:
return "speex ultra wide";
case AudioCodec::Celt:
return "celt";
case AudioCodec::OpusVoice:
return "opus voice";
case AudioCodec::OpusMusic:
return "opus music";
case AudioCodec::Flac:
return "flac";
case AudioCodec::Unknown:
return "unknown";
default:
return "invalid";
}
}
struct EncoderBufferInfo {
size_t sample_count{0};
@ -104,5 +160,38 @@ namespace tc::audio::codec {
[[nodiscard]] virtual bool encode(std::string& /* error */, void* /* target buffer */, size_t& /* target length */, const EncoderBufferInfo& /* buffer info */, const float* /* samples */) = 0;
};
class AudioDecoder {};
struct DecodePayloadInfo {
/**
* Use a value of zero to indicate packet loss
*/
size_t byte_length{0};
bool fec_decode{false};
};
class AudioDecoder {
public:
explicit AudioDecoder() = default;
virtual ~AudioDecoder() = default;
[[nodiscard]] virtual bool valid() const = 0;
[[nodiscard]] virtual bool initialize(std::string& /* error */) = 0;
virtual void reset_sequence() = 0;
/**
* Get the codecs sample rate (will be the output rate)
*/
[[nodiscard]] virtual size_t sample_rate() const = 0;
/**
* Get the codecs audio channel count.
*/
[[nodiscard]] virtual size_t channel_count() const = 0;
/**
* @returns the expected sample count
*/
[[nodiscard]] virtual size_t expected_decoded_length(const void* /* payload */, size_t /* payload length */) const = 0;
[[nodiscard]] virtual bool decode(std::string& /* error */, float* /* target buffer */, size_t& /* target sample count */, const DecodePayloadInfo& /* payload info */, const void* /* payload */) = 0;
};
}

View File

@ -4,162 +4,8 @@
using namespace std;
using namespace tc::audio::codec;
OpusConverter::OpusConverter(size_t c, size_t s, size_t f) : Converter(c, s, f) { }
OpusConverter::~OpusConverter() = default;
bool OpusConverter::valid() {
return this->encoder && this->decoder;
}
bool OpusConverter::initialize(std::string &error, int application_type) {
lock_guard lock(this->coder_lock);
this->_application_type = application_type;
if(!this->_initialize_encoder(error))
return false;
if(!this->_initialize_decoder(error)) {
this->reset_encoder();
return false;
}
return true;
}
void OpusConverter::reset_encoder() {
lock_guard lock(this->coder_lock);
log_info(category::audio, tr("Resetting encoder"));
auto result = opus_encoder_ctl(this->encoder, OPUS_RESET_STATE);
if(result != OPUS_OK)
log_warn(category::audio, tr("Failed to reset opus encoder. Opus result: {}"), result);
}
void OpusConverter::reset_decoder() {
lock_guard lock(this->coder_lock);
log_info(category::audio, tr("Resetting decoder"));
auto result = opus_decoder_ctl(this->decoder, OPUS_RESET_STATE);
if(result != OPUS_OK)
log_warn(category::audio, tr("Failed to reset opus decoder. Opus result: {}"), result);
this->fec_decoder_ = true;
}
void OpusConverter::finalize() {
lock_guard lock(this->coder_lock);
if(this->encoder) opus_encoder_destroy(this->encoder);
this->encoder = nullptr;
if(this->decoder) opus_decoder_destroy(this->decoder);
this->decoder = nullptr;
}
ssize_t OpusConverter::encode(std::string &error, const void *source, void *target, size_t target_length, bool head_package) {
lock_guard lock(this->coder_lock);
opus_encoder_ctl(encoder, OPUS_SET_PACKET_LOSS_PERC(head_package ? 100 : 15));
auto result = opus_encode_float(this->encoder, (float*) source, (int) this->_frame_size, (uint8_t*) target, (opus_int32) target_length);
if(result < OPUS_OK) {
error = to_string(result) + "|" + opus_strerror(result);
return -1;
}
return result;
}
ssize_t OpusConverter::decode(std::string &error, const void *source, size_t source_length, void *target, bool use_fec) {
lock_guard lock(this->coder_lock);
auto result = opus_decode_float(this->decoder, (uint8_t*) source, (opus_int32) source_length, (float*) target, (int) this->_frame_size, use_fec ? 1 : 0);
if(result < OPUS_OK) {
error = to_string(result) + "|" + opus_strerror(result);
return -1;
}
return result;
}
ssize_t OpusConverter::decode_lost(std::string &error, size_t packets) {
lock_guard lock(this->coder_lock);
auto buffer = (float*) malloc(this->_frame_size * this->_channels * sizeof(float));
while (packets-- > 0) {
auto result = opus_decode_float(this->decoder, nullptr, 0, buffer, (int) this->_frame_size, false);
if(result < OPUS_OK)
log_warn(category::audio, tr("Opus decode lost resulted in error: {}"), result);
}
this->fec_decoder_ = true;
free(buffer);
return 0;
}
size_t OpusConverter::expected_encoded_length(size_t sample_count) {
//TODO calculate stuff
return 512;
}
bool OpusConverter::_initialize_decoder(std::string &error) {
if(!this->_finalize_decoder(error))
return false;
int error_id = 0;
this->decoder = opus_decoder_create((opus_int32) this->_sample_rate, (int) this->_channels, &error_id);
if(!this->encoder || error_id) {
error = "failed to create decoder (" + to_string(error_id) + ")";
return false;
}
return true;
}
bool OpusConverter::_initialize_encoder(std::string &error) {
if(!this->_finalize_encoder(error))
return false;
int error_id = 0;
this->encoder = opus_encoder_create((opus_int32) this->_sample_rate, (int) this->_channels, this->_application_type, &error_id);
if(!this->encoder || error_id) {
error = "failed to create encoder (" + to_string(error_id) + ")";
return false;
}
error_id = opus_encoder_ctl(encoder, OPUS_SET_BITRATE(64000));
if(error_id) {
error = "failed to set bitrate (" + to_string(error_id) + ")";
return false;
}
error_id = opus_encoder_ctl(encoder, OPUS_SET_INBAND_FEC(1));
if(error_id) {
error = "failed to enable fec (" + to_string(error_id) + ")";
return false;
}
error_id = opus_encoder_ctl(encoder, OPUS_SET_PACKET_LOSS_PERC(15));
if(error_id) {
error = "failed to assume a 15% packet loss (" + to_string(error_id) + ")";
return false;
}
return true;
}
bool OpusConverter::_finalize_decoder(std::string &) {
if(this->decoder) {
opus_decoder_destroy(this->decoder);
this->decoder = nullptr;
}
return true;
}
bool OpusConverter::_finalize_encoder(std::string &) {
if(this->encoder) {
opus_encoder_destroy(this->encoder);
this->encoder = nullptr;
}
return true;
}
OpusAudioEncoder::OpusAudioEncoder(int application_type) : application_type_{application_type} {}
/* The opus encoder */
OpusAudioEncoder::OpusAudioEncoder(AudioCodec target_codec) : target_codec_{target_codec} {}
OpusAudioEncoder::~OpusAudioEncoder() noexcept {
if(this->encoder) {
opus_encoder_destroy(this->encoder);
@ -172,8 +18,23 @@ bool OpusAudioEncoder::valid() const {
}
bool OpusAudioEncoder::initialize(string &error) {
int error_id = 0;
this->encoder = opus_encoder_create((opus_int32) this->sample_rate(), (int) this->channel_count(), this->application_type_, &error_id);
int application_type;
switch(this->target_codec_) {
case AudioCodec::OpusVoice:
application_type = OPUS_APPLICATION_VOIP;
break;
case AudioCodec::OpusMusic:
application_type = OPUS_APPLICATION_AUDIO;
break;
default:
error = "target codec isn't opus";
return false;
}
int error_id{0};
this->encoder = opus_encoder_create((opus_int32) this->sample_rate(), (int) this->channel_count(), application_type, &error_id);
if(!this->encoder || error_id) {
error = "failed to create encoder (" + to_string(error_id) + ")";
goto cleanup_error;
@ -219,7 +80,7 @@ size_t OpusAudioEncoder::sample_rate() const {
}
size_t OpusAudioEncoder::channel_count() const {
if(this->application_type() == OPUS_APPLICATION_AUDIO) {
if(this->target_codec_ == AudioCodec::OpusMusic) {
return 2;
} else {
return 1;
@ -248,3 +109,70 @@ bool OpusAudioEncoder::encode(std::string& error, void *target_buffer, size_t &t
target_size = (size_t) result;
return true;
}
/* The Opus decoder */
OpusAudioDecoder::OpusAudioDecoder(AudioCodec target_codec) : target_codec_{target_codec} {}
OpusAudioDecoder::~OpusAudioDecoder() noexcept {
if(this->decoder) {
opus_decoder_destroy(this->decoder);
this->decoder = nullptr;
}
};
bool OpusAudioDecoder::valid() const {
return this->decoder != nullptr;
}
bool OpusAudioDecoder::initialize(string &error) {
int error_id{0};
this->decoder = opus_decoder_create((opus_int32) this->sample_rate(), (int) this->channel_count(), &error_id);
if(!this->decoder || error_id) {
error = "failed to create decoder (" + to_string(error_id) + ")";
return false;
}
return true;
}
void OpusAudioDecoder::reset_sequence() {
auto result = opus_decoder_ctl(this->decoder, OPUS_RESET_STATE);
if(result != OPUS_OK) {
log_warn(category::audio, tr("Failed to reset opus decoder. Opus result: {}"), result);
}
}
size_t OpusAudioDecoder::sample_rate() const {
return 48000;
}
size_t OpusAudioDecoder::channel_count() const {
if(this->target_codec_ == AudioCodec::OpusMusic) {
return 2;
} else {
return 1;
}
}
size_t OpusAudioDecoder::expected_decoded_length(const void *payload, size_t payload_size) const {
auto result = opus_decoder_get_nb_samples(this->decoder, (uint8_t*) payload, payload_size);
if(result <= 0) {
return 0;
}
return (size_t) result;
}
bool OpusAudioDecoder::decode(string &error, float *sample_buffer, size_t &sample_count, const DecodePayloadInfo &info, const void *payload) {
auto result = opus_decode_float(this->decoder,
info.byte_length == 0 ? nullptr : (uint8_t*) payload, (opus_int32) info.byte_length,
(float*) sample_buffer, (int) sample_count,
info.fec_decode ? 1 : 0
);
if(result < 0) {
error = to_string(result) + "|" + opus_strerror(result);
return false;
}
sample_count = (size_t) result;
return true;
}

View File

@ -5,42 +5,9 @@
#include <mutex>
namespace tc::audio::codec {
class OpusConverter : public Converter {
public:
OpusConverter(size_t /* channels */, size_t /* sample rate */, size_t /* frame size */);
virtual ~OpusConverter();
bool valid() override;
bool initialize(std::string& /* error */, int /* application type */);
void finalize() override;
void reset_encoder() override;
void reset_decoder() override;
ssize_t encode(std::string & /* error */, const void * /* source */, void * /* target */, size_t /* target size */, bool /* head package */) override;
ssize_t decode(std::string & /* error */, const void * /* source */, size_t /* source size */, void *pVoid1, bool /* use fec */) override;
ssize_t decode_lost(std::string &string, size_t /* packets */) override;
size_t expected_encoded_length(size_t size) override;
private:
std::mutex coder_lock;
OpusDecoder* decoder = nullptr;
OpusEncoder* encoder = nullptr;
bool fec_decoder_{false};
int _application_type = 0;
bool _finalize_encoder(std::string& /* error */);
bool _finalize_decoder(std::string& /* error */);
bool _initialize_encoder(std::string& /* error */);
bool _initialize_decoder(std::string& /* error */);
};
class OpusAudioEncoder : public AudioEncoder {
public:
explicit OpusAudioEncoder(int /* application type */);
explicit OpusAudioEncoder(AudioCodec /* target codec */);
~OpusAudioEncoder() override;
bool valid() const override;
@ -57,10 +24,32 @@ namespace tc::audio::codec {
bool encode(std::string&, void *, size_t &, const EncoderBufferInfo &, const float *) override;
[[nodiscard]] inline auto application_type() const { return this->application_type_; }
private:
int application_type_;
AudioCodec target_codec_;
OpusEncoder* encoder{nullptr};
};
class OpusAudioDecoder : public AudioDecoder {
public:
explicit OpusAudioDecoder(AudioCodec /* target codec */);
~OpusAudioDecoder() override;
bool valid() const override;
bool initialize(std::string &string) override;
void reset_sequence() override;
size_t sample_rate() const override;
size_t channel_count() const override;
size_t expected_decoded_length(const void *pVoid, size_t size) const override;
bool decode(std::string &, float *, size_t &, const DecodePayloadInfo &, const void *) override;
private:
AudioCodec target_codec_;
OpusDecoder* decoder{nullptr};
};
}

View File

@ -18,10 +18,6 @@
#include <query/Command.h>
#include "ServerConnection.h"
namespace ts::connection {
class CryptionHandler;
}
namespace tc::connection {
class ServerConnection;

View File

@ -4,7 +4,6 @@
#include "../../audio/AudioEventLoop.h"
#include "../../audio/AudioMerger.h"
#include "../../audio/AudioReframer.h"
#include "../../audio/codec/OpusConverter.h"
using namespace std;
using namespace tc;
@ -43,7 +42,7 @@ void VoiceSender::send_data(const float *data, size_t samples, size_t rate, size
const auto aligned_frame_size{((sizeof(AudioFrame) + 3) / sizeof(float)) * sizeof(float)};
auto frame = (AudioFrame*) malloc(aligned_frame_size + samples * channels * sizeof(float));
new (&frame) AudioFrame{};
new (frame) AudioFrame{};
frame->sample_count = samples;
frame->sample_rate = rate;
@ -65,7 +64,7 @@ void VoiceSender::send_data(const float *data, size_t samples, size_t rate, size
void VoiceSender::send_stop() {
auto frame = (AudioFrame*) malloc(sizeof(AudioFrame));
new (&frame) AudioFrame{};
new (frame) AudioFrame{};
frame->timestamp = chrono::system_clock::now();
{
@ -82,10 +81,6 @@ void VoiceSender::finalize() {
this->handle = nullptr;
}
void VoiceSender::set_codec(connection::codec::value target_codec) {
this->target_codec_ = target_codec;
}
void VoiceSender::event_execute(const std::chrono::system_clock::time_point &point) {
static auto max_time = chrono::milliseconds(10);
@ -129,47 +124,52 @@ void VoiceSender::encode_raw_frame(const AudioFrame* frame) {
if(frame->sample_rate == 0) {
/* Audio sequence end */
this->audio_sequence_no = 0;
if(this->current_codec_.has_value()) {
auto codec_protocol_id = audio::codec::audio_codec_to_protocol_id(this->current_codec);
if(codec_protocol_id.has_value()) {
this->flush_current_codec();
if(this->codec_encoder) {
this->codec_encoder->reset_sequence();
}
auto server = this->handle->handle();
server->send_voice_data(nullptr, 0, *this->current_codec_, false);
server->send_voice_data(nullptr, 0, *codec_protocol_id, false);
}
return;
}
if(!this->current_codec_.has_value() || *this->current_codec_ != this->target_codec_) {
if(this->current_codec != this->target_codec_) {
auto codec_protocol_id = audio::codec::audio_codec_to_protocol_id(this->target_codec_);
if(!codec_protocol_id.has_value()) {
/* we can't send it so no need to initialize it */
return;
}
this->flush_current_codec();
this->audio_sequence_no = 0;
this->codec_resampler = nullptr;
this->codec_reframer = nullptr;
this->codec_encoder = nullptr;
this->current_codec_ = std::make_optional(this->target_codec_);
this->current_codec = this->target_codec_;
if(!audio::codec::audio_encode_supported(this->current_codec)) {
log_warn(category::voice_connection, tr("Audio sender set to codec where encoding is not supported. Do not send any audio data."));
return;
}
this->codec_encoder = audio::codec::create_audio_encoder(this->current_codec);
if(!this->codec_encoder) {
log_error(category::voice_connection, tr("Failed to allocate new audio encoder for codec {}"), (uint32_t) this->target_codec_);
return;
}
std::string error{};
switch(this->target_codec_) {
case codec::OPUS_VOICE:
this->codec_encoder = std::make_unique<audio::codec::OpusAudioEncoder>(OPUS_APPLICATION_VOIP);
if(!this->codec_encoder->initialize(error)) {
log_error(category::voice_connection, tr("Failed to initialize target audio codec {}"), (uint32_t) this->target_codec_);
this->codec_encoder = nullptr;
return;
}
break;
case codec::OPUS_MUSIC:
this->codec_encoder = std::make_unique<audio::codec::OpusAudioEncoder>(OPUS_APPLICATION_AUDIO);
if(!this->codec_encoder->initialize(error)) {
log_error(category::voice_connection, tr("Failed to initialize target audio codec {}"), (uint32_t) this->target_codec_);
this->codec_encoder = nullptr;
return;
}
break;
default:
log_error(category::voice_connection, tr("Unknown target encode codec {}"), (uint32_t) this->target_codec_);
return;
if(!this->codec_encoder->initialize(error)) {
log_error(category::voice_connection, tr("Failed to initialize auto encoder (codec {}) {}"), (uint32_t) this->target_codec_, error);
this->codec_encoder = nullptr;
return;
}
}
@ -229,8 +229,11 @@ void VoiceSender::encode_raw_frame(const AudioFrame* frame) {
constexpr static auto kMaxPacketSize{1500};
void VoiceSender::handle_network_frame(const float *sample_buffer, size_t sample_count, bool is_flush) {
assert(this->current_codec_.has_value());
assert(this->codec_encoder);
auto codec_protocol_id = audio::codec::audio_codec_to_protocol_id(this->current_codec);
if(!codec_protocol_id.has_value()) {
return;
}
//log_trace(category::voice_connection, tr("Encoding audio chunk of {}/{} aka {}ms with codec {}"),
// sample_count, this->codec_encoder->sample_rate(), sample_count * 1000 / this->codec_encoder->sample_rate(), *this->current_codec_);
@ -249,8 +252,13 @@ void VoiceSender::handle_network_frame(const float *sample_buffer, size_t sample
return;
}
if(!packet_size) {
/* No audio packet created */
return;
}
auto server = this->handle->handle();
server->send_voice_data(packet_buffer, packet_size, *this->current_codec_, buffer_info.head_sequence);
server->send_voice_data(packet_buffer, packet_size, *codec_protocol_id, buffer_info.head_sequence);
}
void VoiceSender::flush_current_codec() {

View File

@ -3,7 +3,7 @@
#include <mutex>
#include <memory>
#include <optional>
#include "VoiceClient.h"
#include "./VoiceClient.h"
namespace tc {
namespace audio {
@ -20,17 +20,17 @@ namespace tc {
namespace connection {
class VoiceConnection;
class VoiceSender : public event::EventEntry {
template<typename _Tp, typename _Up>
friend inline std::shared_ptr<_Tp> std::static_pointer_cast(const std::shared_ptr<_Up>& __r) noexcept;
friend class VoiceConnection;
public:
using AudioCodec = audio::codec::AudioCodec;
explicit VoiceSender(VoiceConnection*);
virtual ~VoiceSender();
void finalize();
codec::value get_codec() { return this->current_codec_.value_or(this->target_codec_); }
void set_codec(codec::value value);
[[nodiscard]] inline auto target_codec() const { return this->target_codec_; }
inline void set_codec(const AudioCodec& target) { this->target_codec_ = target; }
void send_data(const float* /* buffer */, size_t /* samples */, size_t /* sample rate */, size_t /* channels */);
void send_stop();
@ -61,8 +61,8 @@ namespace tc {
bool voice_send_enabled{false};
/* Codec specific values */
codec::value target_codec_{codec::OPUS_VOICE};
std::optional<codec::value> current_codec_{};
AudioCodec target_codec_{AudioCodec::Unknown};
AudioCodec current_codec{AudioCodec::Unknown};
std::unique_ptr<audio::codec::AudioEncoder> codec_encoder{};
std::unique_ptr<audio::AudioResampler> codec_resampler{};

View File

@ -14,57 +14,6 @@ extern tc::audio::AudioOutput* global_audio_output;
#define DEBUG_PREMATURE_PACKETS
#ifdef WIN32
#define _field_(name, value) value
#else
#define _field_(name, value) .name = value
#endif
const codec::CodecInfo codec::info[6] = {
{
_field_(supported, false),
_field_(name, "speex_narrowband"),
_field_(new_converter, nullptr)
},
{
_field_(supported, false),
_field_(name, "speex_wideband"),
_field_(new_converter, nullptr)
},
{
_field_(supported, false),
_field_(name, "speex_ultra_wideband"),
_field_(new_converter, nullptr)
},
{
_field_(supported, false),
_field_(name, "celt_mono"),
_field_(new_converter, nullptr)
},
{
_field_(supported, true),
_field_(name, "opus_voice"),
_field_(new_converter, [](string& error) -> std::shared_ptr<Converter> {
auto result = std::make_shared<OpusConverter>(1, 48000, 960);
if(!result->initialize(error, OPUS_APPLICATION_VOIP)) {
return nullptr;
}
return std::dynamic_pointer_cast<Converter>(result);
})
},
{
_field_(supported, true),
_field_(name, "opus_music"),
_field_(new_converter, [](string& error) -> std::shared_ptr<Converter> {
auto result = std::make_shared<OpusConverter>(2, 48000, 960);
if(!result->initialize(error, OPUS_APPLICATION_AUDIO)) {
return nullptr;
}
return std::dynamic_pointer_cast<Converter>(result);
})
}
};
void VoiceClientWrap::do_wrap(const v8::Local<v8::Object> &object) {
this->Wrap(object);
@ -267,7 +216,7 @@ void VoiceClient::initialize() {
void VoiceClient::execute_tick() {
switch(this->state_) {
case state::buffering:
if(this->_last_received_packet + chrono::milliseconds{250} < chrono::system_clock::now()) {
if(this->packet_queue.last_packet_timestamp + chrono::milliseconds{250} < chrono::system_clock::now()) {
this->set_state(state::stopped);
log_debug(category::audio, tr("Audio stop packet for client {} seems to be lost. Stopping playback."), this->client_id_);
}
@ -341,9 +290,6 @@ void VoiceClient::finalize_js_object() {
* @param clip_window The size how long the "overflow" counts
* @return true if lower is less than upper
*/
#ifdef max
#undef max
#endif
inline constexpr bool packet_id_less(uint16_t lower, uint16_t upper, uint16_t window) {
constexpr auto bounds = std::numeric_limits<uint16_t>::max();
@ -353,8 +299,9 @@ inline constexpr bool packet_id_less(uint16_t lower, uint16_t upper, uint16_t wi
return true;
} else if(upper > lower) {
return true;
} else {
return false;
}
return false;
} else {
if(lower >= upper) {
return false;
@ -365,70 +312,48 @@ inline constexpr bool packet_id_less(uint16_t lower, uint16_t upper, uint16_t wi
}
inline constexpr uint16_t packet_id_diff(uint16_t lower, uint16_t upper) {
if(upper < lower)
return (uint16_t) (((uint32_t) upper | 0x10000U) - (uint32_t) lower);
if(upper < lower) {
return (uint16_t) (((uint32_t) upper | 0x10000U) - (uint32_t) lower);
}
return upper - lower;
}
#define MAX_LOST_PACKETS (6)
#define TEMP_BUFFER_LENGTH 16384
void VoiceClient::process_packet(uint16_t packet_id, const pipes::buffer_view& buffer, codec::value buffer_codec, bool is_head) {
void VoiceClient::process_packet(uint16_t packet_id, const pipes::buffer_view& buffer, uint8_t buffer_codec, bool is_head) {
#if 0
if(rand() % 10 == 0) {
log_info(category::audio, tr("Dropping audio packet id {}"), packet_id);
return;
}
#endif
if(buffer_codec < 0 || buffer_codec > this->codec.size()) {
log_warn(category::voice_connection, tr("Received voice packet from client {} with unknown codec ({})"), this->client_id_, buffer_codec);
return;
}
if(!this->output_source) {
/* audio hasn't been initialized yet */
return;
}
auto& codec_data = this->codec[buffer_codec];
if(codec_data.state == AudioCodec::State::UNINITIALIZED)
this->initialize_code(buffer_codec);
if(codec_data.state != AudioCodec::State::INITIALIZED_SUCCESSFULLY) {
log_warn(category::voice_connection, tr("Dropping audio packet because audio codec {} hasn't been initialized successfully (state: {})"), buffer_codec, (int) codec_data.state);
return;
}
//TODO: short circuit handling if we've muted him (e.g. volume = 0)
auto encoded_buffer = new EncodedBuffer{};
encoded_buffer->packet_id = packet_id;
encoded_buffer->codec = buffer_codec;
encoded_buffer->receive_timestamp = chrono::system_clock::now();
encoded_buffer->buffer = buffer.own_buffer();
encoded_buffer->head = is_head;
this->_last_received_packet = encoded_buffer->receive_timestamp;
{
lock_guard lock{codec_data.pending_lock};
if(codec_data.stream_timeout() < encoded_buffer->receive_timestamp) {
lock_guard lock{this->packet_queue.pending_lock};
if(this->packet_queue.stream_timeout() < encoded_buffer->receive_timestamp) {
//Old stream hasn't been terminated successfully.
//TODO: Cleanup packets which are too old?
codec_data.force_replay = encoded_buffer;
this->packet_queue.force_replay = encoded_buffer;
} else if(encoded_buffer->buffer.empty()) {
//Flush replay and stop
codec_data.force_replay = encoded_buffer;
this->packet_queue.force_replay = encoded_buffer;
}
if(packet_id_less(encoded_buffer->packet_id, codec_data.last_packet_id, MAX_LOST_PACKETS) || encoded_buffer->packet_id == codec_data.last_packet_id) {
if(packet_id_less(encoded_buffer->packet_id, this->packet_queue.last_packet_id, MAX_LOST_PACKETS) || encoded_buffer->packet_id == this->packet_queue.last_packet_id) {
log_debug(category::voice_connection,
tr("Received audio packet which is older than the current index (packet: {}, current: {})"), encoded_buffer->packet_id, codec_data.last_packet_id);
tr("Received audio packet which is older than the current index (packet: {}, current: {})"), encoded_buffer->packet_id, this->packet_queue.last_packet_id);
return;
}
/* insert the new buffer */
{
EncodedBuffer* prv_head{nullptr};
auto head{codec_data.pending_buffers};
auto head{this->packet_queue.pending_buffers};
while(head && packet_id_less(head->packet_id, encoded_buffer->packet_id, MAX_LOST_PACKETS)) {
prv_head = head;
head = head->next;
@ -438,14 +363,14 @@ void VoiceClient::process_packet(uint16_t packet_id, const pipes::buffer_view& b
if(prv_head) {
prv_head->next = encoded_buffer;
} else {
codec_data.pending_buffers = encoded_buffer;
this->packet_queue.pending_buffers = encoded_buffer;
}
}
codec_data.last_packet_timestamp = encoded_buffer->receive_timestamp;
codec_data.process_pending = true;
this->packet_queue.last_packet_timestamp = encoded_buffer->receive_timestamp;
this->packet_queue.process_pending = true;
}
audio::decode_event_loop->schedule(static_pointer_cast<event::EventEntry>(this->ref()));
audio::decode_event_loop->schedule(dynamic_pointer_cast<event::EventEntry>(this->ref()));
}
void VoiceClient::cancel_replay() {
@ -466,17 +391,13 @@ void VoiceClient::cancel_replay() {
}
void VoiceClient::drop_enqueued_buffers() {
for(auto& codec_entry : this->codec) {
auto head = codec_entry.pending_buffers;
while(head) {
auto tmp = head->next;
delete head;
head = tmp;
}
auto head = std::exchange(this->packet_queue.pending_buffers, nullptr);
while(head) {
delete std::exchange(head, head->next);
}
codec_entry.pending_buffers = nullptr;
codec_entry.force_replay = nullptr;
}
this->packet_queue.pending_buffers = nullptr;
this->packet_queue.force_replay = nullptr;
}
void VoiceClient::event_execute(const std::chrono::system_clock::time_point &scheduled) {
@ -487,341 +408,370 @@ void VoiceClient::event_execute(const std::chrono::system_clock::time_point &sch
}
static auto max_time = chrono::milliseconds(10);
auto reschedule{false};
string error;
auto timeout = chrono::system_clock::now() + max_time;
std::unique_lock lock{this->packet_queue.pending_lock};
while(this->packet_queue.process_pending) {
assert(lock.owns_lock());
EncodedBuffer* replay_head{nullptr};
uint16_t local_last_pid{this->packet_queue.last_packet_id};
for(auto& audio_codec : this->codec) {
std::unique_lock lock{audio_codec.pending_lock};
while(audio_codec.process_pending) {
assert(lock.owns_lock());
EncodedBuffer* replay_head{nullptr};
uint16_t local_last_pid{audio_codec.last_packet_id};
/* nothing to play */
if(!this->packet_queue.pending_buffers) {
this->packet_queue.process_pending = false;
break;
}
/* nothing to play */
if(!audio_codec.pending_buffers) {
audio_codec.process_pending = false;
break;
}
if(this->packet_queue.force_replay) {
replay_head = this->packet_queue.pending_buffers;
this->packet_queue.pending_buffers = this->packet_queue.force_replay->next;
if(audio_codec.force_replay) {
replay_head = audio_codec.pending_buffers;
audio_codec.pending_buffers = audio_codec.force_replay->next;
this->packet_queue.force_replay->next = nullptr;
this->packet_queue.force_replay = nullptr;
} else {
EncodedBuffer* prv_head{nullptr};
EncodedBuffer* head{nullptr};
audio_codec.force_replay->next = nullptr;
audio_codec.force_replay = nullptr;
} else {
EncodedBuffer* prv_head{nullptr};
EncodedBuffer* head{nullptr};
//Trying to replay the sequence
head = this->packet_queue.pending_buffers;
while(head && head->packet_id == this->packet_queue.last_packet_id + 1) {
if(!replay_head) {
replay_head = this->packet_queue.pending_buffers;
}
//Trying to replay the sequence
head = audio_codec.pending_buffers;
while(head && head->packet_id == audio_codec.last_packet_id + 1) {
if(!replay_head) {
replay_head = audio_codec.pending_buffers;
}
this->packet_queue.last_packet_id++;
prv_head = head;
head = head->next;
}
this->packet_queue.pending_buffers = head;
audio_codec.last_packet_id++;
prv_head = head;
head = head->next;
}
audio_codec.pending_buffers = head;
if(prv_head) {
prv_head->next = nullptr; /* mark the tail */
} else {
assert(!replay_head); /* could not be set, else prv_head would be set */
if(prv_head) {
prv_head->next = nullptr; /* mark the tail */
} else {
assert(!replay_head); /* could not be set, else prv_head would be set */
//No packet found here, test if we've more than n packets in a row somewhere
//No packet found here, test if we've more than n packets in a row somewhere
#define SKIP_SEQ_LENGTH (3)
EncodedBuffer* skip_ptr[SKIP_SEQ_LENGTH + 1];
memset(skip_ptr, 0, sizeof(skip_ptr));
skip_ptr[0] = audio_codec.pending_buffers;
EncodedBuffer* skip_ptr[SKIP_SEQ_LENGTH + 1];
memset(skip_ptr, 0, sizeof(skip_ptr));
skip_ptr[0] = this->packet_queue.pending_buffers;
while(skip_ptr[0]->next) {
for(size_t i = 0; i < SKIP_SEQ_LENGTH; i++) {
if(!skip_ptr[i]->next || skip_ptr[i]->packet_id + 1 != skip_ptr[i]->next->packet_id) {
break;
}
while(skip_ptr[0]->next) {
for(size_t i = 0; i < SKIP_SEQ_LENGTH; i++) {
if(!skip_ptr[i]->next || skip_ptr[i]->packet_id + 1 != skip_ptr[i]->next->packet_id) {
break;
}
skip_ptr[i + 1] = skip_ptr[i]->next;
}
skip_ptr[i + 1] = skip_ptr[i]->next;
}
if(skip_ptr[SKIP_SEQ_LENGTH]) {
break;
}
if(skip_ptr[SKIP_SEQ_LENGTH]) {
break;
}
skip_ptr[0] = skip_ptr[0]->next;
}
skip_ptr[0] = skip_ptr[0]->next;
}
if(skip_ptr[SKIP_SEQ_LENGTH]) {
/* we've three packets in a row */
replay_head = audio_codec.pending_buffers;
audio_codec.pending_buffers = skip_ptr[SKIP_SEQ_LENGTH];
skip_ptr[SKIP_SEQ_LENGTH - 1]->next = nullptr;
log_trace(category::voice_connection, tr("Skipping from {} to {} because of {} packets in a row"), audio_codec.last_packet_id, replay_head->packet_id, SKIP_SEQ_LENGTH);
if(skip_ptr[SKIP_SEQ_LENGTH]) {
/* we've three packets in a row */
replay_head = this->packet_queue.pending_buffers;
this->packet_queue.pending_buffers = skip_ptr[SKIP_SEQ_LENGTH];
skip_ptr[SKIP_SEQ_LENGTH - 1]->next = nullptr;
log_trace(category::voice_connection, tr("Skipping from {} to {} because of {} packets in a row"), this->packet_queue.last_packet_id, replay_head->packet_id, SKIP_SEQ_LENGTH);
/*
* Do not set process_pending to false, because we're not done
* We're just replaying all loose packets which are not within a sequence until we reach a sequence
* In the next loop the sequence will be played
*/
} else {
head = audio_codec.pending_buffers;
while(head) {
if(packet_id_diff(audio_codec.last_packet_id, head->packet_id) >= 5) {
break;
}
/*
* Do not set process_pending to false, because we're not done
* We're just replaying all loose packets which are not within a sequence until we reach a sequence
* In the next loop the sequence will be played
*/
} else {
head = this->packet_queue.pending_buffers;
while(head) {
if(packet_id_diff(this->packet_queue.last_packet_id, head->packet_id) >= 5) {
break;
}
head = head->next;
}
head = head->next;
}
if(head) {
replay_head = audio_codec.pending_buffers;
audio_codec.pending_buffers = head->next;
head->next = nullptr;
log_trace(category::voice_connection, tr("Skipping from {} to {} because of over 6 packets between"),
audio_codec.last_packet_id, replay_head->packet_id);
/* do not negate process_pending here. Same reason as with the 3 sequence */
} else {
/* no packets we're willing to replay */
audio_codec.process_pending = false;
}
}
}
}
if(head) {
replay_head = this->packet_queue.pending_buffers;
this->packet_queue.pending_buffers = head->next;
head->next = nullptr;
log_trace(category::voice_connection, tr("Skipping from {} to {} because of over 6 packets between"),
this->packet_queue.last_packet_id, replay_head->packet_id);
/* do not negate process_pending here. Same reason as with the 3 sequence */
} else {
/* no packets we're willing to replay */
this->packet_queue.process_pending = false;
}
}
}
}
if(!replay_head) {
audio_codec.process_pending = false;
break;
}
if(!replay_head) {
this->packet_queue.process_pending = false;
break;
}
{
auto head = replay_head;
while(head->next) {
head = head->next;
}
{
auto head = replay_head;
while(head->next) {
head = head->next;
}
audio_codec.last_packet_id = head->packet_id;
const auto ordered = !audio_codec.pending_buffers || packet_id_less(audio_codec.last_packet_id, audio_codec.pending_buffers->packet_id, 10);
if(!ordered) {
log_critical(category::voice_connection, tr("Unordered packet ids. [!audio_codec.pending_buffers: {}; a: {}; b: {}]"),
!audio_codec.pending_buffers,
audio_codec.last_packet_id, audio_codec.pending_buffers->packet_id
);
//assert(!audio_codec.pending_buffers || packet_id_less(audio_codec.last_packet_id, audio_codec.pending_buffers->packet_id, 10));
}
}
lock.unlock();
this->packet_queue.last_packet_id = head->packet_id;
const auto ordered = !this->packet_queue.pending_buffers || packet_id_less(this->packet_queue.last_packet_id, this->packet_queue.pending_buffers->packet_id, 10);
if(!ordered) {
log_critical(category::voice_connection, tr("Unordered packet ids. [!this->packet_queue.pending_buffers: {}; a: {}; b: {}]"),
!this->packet_queue.pending_buffers,
this->packet_queue.last_packet_id, this->packet_queue.pending_buffers->packet_id
);
//assert(!this->packet_queue.pending_buffers || packet_id_less(this->packet_queue.last_packet_id, this->packet_queue.pending_buffers->packet_id, 10));
}
}
lock.unlock();
while(replay_head) {
if(replay_head->buffer.empty()) {
switch(this->state_) {
case state::playing:
case state::buffering:
this->set_state(state::stopping);
log_debug(category::voice_connection, tr("Client {} send a stop signal. Flushing stream and stopping"), this->client_id_);
break;
while(replay_head) {
if(replay_head->buffer.empty()) {
switch(this->state_) {
case state::playing:
case state::buffering:
this->set_state(state::stopping);
log_debug(category::voice_connection, tr("Client {} send a stop signal. Flushing stream and stopping"), this->client_id_);
break;
case state::stopping:
case state::stopped:
break;
case state::stopping:
case state::stopped:
break;
default:
assert(false);
break;
}
} else {
auto lost_packets = packet_id_diff(local_last_pid, replay_head->packet_id) - 1;
if(lost_packets > 10) {
log_debug(category::voice_connection, tr("Client {} seems to be missing {} packets in stream ({} to {}). Resetting decoder."), this->client_id_, lost_packets, local_last_pid, replay_head->packet_id);
replay_head->reset_decoder = true;
} else if(lost_packets > 0) {
log_debug(category::voice_connection, tr("Client {} seems to be missing {} packets in stream. FEC decoding it."), this->client_id_, lost_packets);
/*
if(audio_codec.converter->decode_lost(error, lost_packets))
log_warn(category::audio, tr("Failed to decode lost packets for client {}: {}"), this->_client_id, error);
*/
auto decoded = this->decode_buffer(audio_codec.codec, replay_head->buffer, true);
if(decoded) {
this->output_source->enqueue_samples(decoded->sample_data, decoded->sample_size);
}
}
default:
assert(false);
break;
}
} else {
bool reset_decoder{false};
auto lost_packets = packet_id_diff(local_last_pid, replay_head->packet_id) - 1;
if(lost_packets > 10) {
log_debug(category::voice_connection, tr("Client {} seems to be missing {} packets in stream ({} to {}). Resetting decoder."), this->client_id_, lost_packets, local_last_pid, replay_head->packet_id);
reset_decoder = true;
} else if(lost_packets > 0) {
log_debug(category::voice_connection, tr("Client {} seems to be missing {} packets in stream. FEC decoding it."), this->client_id_, lost_packets);
/*
if(this->packet_queue.converter->decode_lost(error, lost_packets))
log_warn(category::audio, tr("Failed to decode lost packets for client {}: {}"), this->_client_id, error);
*/
bool is_new_audio_stream;
switch(this->state_) {
case state::stopped:
case state::stopping:
is_new_audio_stream = true;
break;
/* TODO: Notify the decoder about the lost decode packet? */
/* Reconstructing and replaying the lost packet by fec decoding the next known packet */
this->playback_audio_packet(replay_head->codec, replay_head->buffer.data_ptr(), replay_head->buffer.length(), true);
}
case state::buffering:
case state::playing:
is_new_audio_stream = false;
break;
bool is_new_audio_stream;
switch(this->state_) {
case state::stopped:
case state::stopping:
is_new_audio_stream = true;
break;
default:
assert(false);
is_new_audio_stream = false;
break;
}
case state::buffering:
case state::playing:
is_new_audio_stream = false;
break;
if(replay_head->reset_decoder || is_new_audio_stream) {
audio_codec.converter->reset_decoder();
replay_head->reset_decoder = false;
default:
assert(false);
is_new_audio_stream = false;
break;
}
#if 1 /* Better approch */
/* initialize with last packet */
static constexpr auto kTempBufferLength{16384};
char temp_target_buffer[kTempBufferLength];
if(kTempBufferLength >= audio_codec.converter->expected_decoded_length(replay_head->buffer.data_ptr(), replay_head->buffer.length())) {
audio_codec.converter->decode(error, replay_head->buffer.data_ptr(), replay_head->buffer.length(), temp_target_buffer, true);
} else {
//TODO: May a small warning here?
}
#endif
}
if(reset_decoder || is_new_audio_stream) {
this->reset_decoder(false);
}
#if 0 /* (maybe) TS3 approch */
if(replay_head->head) {
/* initialize with last packet */
char target_buffer[target_buffer_length];
if(target_buffer_length > audio_codec.converter->expected_decoded_length(replay_head->buffer.data_ptr(), replay_head->buffer.length())) {
audio_codec.converter->decode(error, replay_head->buffer.data_ptr(), replay_head->buffer.length(), target_buffer, 1);
} else {
//TODO: May a small warning here?
}
}
#endif
this->playback_audio_packet(replay_head->codec, replay_head->buffer.data_ptr(), replay_head->buffer.length(), false);
}
//TODO: Use statically allocated buffer?
auto decoded = this->decode_buffer(audio_codec.codec, replay_head->buffer, false);
if(decoded) {
if(is_new_audio_stream) {
log_warn(category::audio, tr("New audio chunk for client {}"), this->client_id_);
local_last_pid = replay_head->packet_id;
//this->output_source->enqueue_silence((size_t) ceil(0.0075f * (float) this->output_source->sample_rate)); /* enqueue 7.5ms silence so we give the next packet a chance to be send */
}
delete std::exchange(replay_head, replay_head->next);
}
this->output_source->enqueue_samples(decoded->sample_data, decoded->sample_size);
this->set_state(state::playing);
}
}
/*
* Needs to be locked when entering the loop.
* We'll check for more packets.
*/
lock.lock();
};
}
local_last_pid = replay_head->packet_id;
auto last_head = replay_head;
replay_head = replay_head->next;
delete last_head;
}
/*
* Needs to be locked when entering the loop.
* We'll check for more packets.
*/
lock.lock();
};
}
if(reschedule) {
log_warn(category::voice_connection, tr("Audio data decode will take longer than {} us. Enqueueing for later"),
chrono::duration_cast<chrono::microseconds>(max_time).count());
audio::decode_event_loop->schedule(static_pointer_cast<event::EventEntry>(this->ref()));
void VoiceClient::reset_decoder(bool deallocate) {
this->decoder.decoder_initialized = false;
if(deallocate) {
this->decoder.decoder = nullptr;
this->decoder.resampler = nullptr;
this->decoder.current_codec = AudioCodec::Unknown;
} else if(this->decoder.decoder) {
this->decoder.decoder->reset_sequence();
}
}
void VoiceClient::initialize_code(const codec::value &audio_codec) {
assert(this->output_source);
constexpr static auto kTempBufferSampleSize{1024 * 8};
void VoiceClient::playback_audio_packet(uint8_t protocol_codec_id, const void *payload, size_t payload_size, bool fec_decode) {
auto payload_codec = audio::codec::audio_codec_from_protocol_id(protocol_codec_id);
if(!payload_codec.has_value()) {
log_trace(category::audio, tr("Received packet with unknown audio codec id ({})."), (size_t) protocol_codec_id);
return;
}
string error;
if(this->decoder.current_codec != *payload_codec) {
if(fec_decode) {
log_debug(category::audio, tr("Trying to fec decode audio packet but decoder hasn't been initialized with that codec. Dropping attempt."));
return;
}
auto& codec_data = this->codec[audio_codec];
if(codec_data.state != AudioCodec::State::UNINITIALIZED) {
log_warn(category::voice_connection, tr("Could not initialize codec of type {} because it isn't in uninitialized state anymore!"), (int) codec_data.state);
this->decoder.decoder_initialized = false;
this->decoder.decoder = nullptr;
this->decoder.current_codec = *payload_codec;
if(!audio::codec::audio_decode_supported(this->decoder.current_codec)) {
log_warn(category::voice_connection, tr("Client {} using and unsupported audio codec ({}). Dropping all its audio data."),
this->client_id_, audio::codec::audio_codec_name(this->decoder.current_codec));
return;
}
this->decoder.decoder = audio::codec::create_audio_decoder(this->decoder.current_codec);
if(!this->decoder.decoder) {
log_error(category::voice_connection, tr("Failed to create decoder for audio codec {}."), audio::codec::audio_codec_name(this->decoder.current_codec));
return;
}
std::string error{};
if(!this->decoder.decoder->initialize(error)) {
log_error(category::voice_connection, tr("Failed to initialize {} decoder: {}"), audio::codec::audio_codec_name(this->decoder.current_codec), error);
this->decoder.decoder = nullptr;
return;
}
}
if(!this->decoder.decoder) {
/* Decoder failed to initialize. Dropping all packets. */
return;
}
codec_data.codec = audio_codec;
float temp_buffer[kTempBufferSampleSize];
const auto decoder_channel_count = this->decoder.decoder->channel_count();
auto info = codec::get_info(audio_codec);
if(!info || !info->supported) {
log_warn(category::voice_connection, tr("Failed to initialized codec {} for client {}. Codec is not supported"), audio_codec, this->client_id_);
codec_data.state = AudioCodec::State::UNSUPPORTED;
return;
if(!this->decoder.decoder_initialized) {
if(fec_decode) {
log_debug(category::audio, tr("Trying to fec decode audio packet but decoder hasn't been initialized with that codec. Dropping attempt."));
return;
}
/*
* We're fec decoding so we need to pass the amount of samples we want to decode.
* Usually a network packet contains 20ms of audio data.
*/
auto sample_count{(size_t) (this->decoder.decoder->sample_rate() * 0.02)};
DecodePayloadInfo decode_info{};
decode_info.fec_decode = true;
decode_info.byte_length = payload_size;
std::string error{};
if(!this->decoder.decoder->decode(error, temp_buffer, sample_count, decode_info, payload)) {
log_warn(category::audio, tr("Failed to initialize decoder with fec data: {}"), error);
}
this->decoder.decoder_initialized = true;
}
codec_data.state = AudioCodec::State::INITIALIZED_FAIL;
codec_data.converter = info->new_converter(error);
if(!codec_data.converter) {
log_warn(category::voice_connection, tr("Failed to initialized codec {} for client {}. Failed to initialize decoder: {}"), audio_codec, this->client_id_, error);
return;
}
codec_data.resampler = make_shared<audio::AudioResampler>(codec_data.converter->sample_rate(), this->output_source->sample_rate(), this->output_source->channel_count());
if(!codec_data.resampler->valid()) {
log_warn(category::voice_connection, tr("Failed to initialized codec {} for client {}. Failed to initialize resampler"), audio_codec, this->client_id_);
return;
}
size_t current_sample_rate;
size_t current_sample_count;
size_t current_channel_count;
{
std::string error{};
auto sample_count{kTempBufferSampleSize / decoder_channel_count};
if(fec_decode) {
/*
* See notes for the decoder initialisation.
*/
sample_count = (size_t) (this->decoder.decoder->sample_rate() * 0.02);
}
codec_data.state = AudioCodec::State::INITIALIZED_SUCCESSFULLY;
log_trace(category::voice_connection, tr("Successfully initialized codec {} for client {}."), audio_codec, this->client_id_);
DecodePayloadInfo decode_info{};
decode_info.fec_decode = fec_decode;
decode_info.byte_length = payload_size;
if(!this->decoder.decoder->decode(error, temp_buffer, sample_count, decode_info, payload)) {
log_warn(category::audio, tr("Failed to decode audio packet (fec: {}): {}"), fec_decode, error);
}
current_sample_count = sample_count;
current_channel_count = this->decoder.decoder->channel_count();
current_sample_rate = this->decoder.decoder->sample_rate();
}
auto audio_output = this->output_source;
if(!audio_output) {
/*
* We have no target to replay the audio.
* We're only doing it here and not earlier to provide the decoder with the required info of the audio sequence
* so when we actually have audio we're not lacking behind and having some artefacts.
*/
return;
}
if(this->volume_ == 0) {
/* Client has been muted */
return;
}
const auto output_channel_count = audio_output->channel_count();
if(current_channel_count != output_channel_count) {
if(kTempBufferSampleSize < output_channel_count * current_sample_count) {
log_error(category::voice_connection, tr("Temporary buffer can't hold {} samples ({} channels) of audio data. Audio frame too big. Dropping it."), current_sample_count, output_channel_count);
return;
}
if(!audio::merge::merge_channels_interleaved(temp_buffer, output_channel_count, temp_buffer, current_channel_count, current_sample_count)) {
log_warn(category::voice_connection, tr("Failed to merge channels to output stream channel count!"));
return;
}
current_channel_count = output_channel_count;
}
const auto output_sample_rate = audio_output->sample_rate();
if(current_sample_rate != output_sample_rate) {
if(!this->decoder.resampler || this->decoder.resampler->output_rate() != output_sample_rate || this->decoder.resampler->input_rate() != current_sample_rate) {
this->decoder.resampler = std::make_unique<audio::AudioResampler>(current_sample_rate, output_sample_rate, output_channel_count);
}
auto expected_output_samples = this->decoder.resampler->estimated_output_size(current_sample_count);
if(expected_output_samples * output_channel_count > kTempBufferSampleSize) {
log_error(category::voice_connection, tr("Temporary buffer can't hold the full resampled frame. Dropping it."), current_sample_count, output_channel_count);
return;
}
size_t output_samples{expected_output_samples};
if(!this->decoder.resampler->process(temp_buffer, temp_buffer, current_sample_count, output_samples)) {
log_error(category::voice_connection, tr("Failed to resample audio codec sample rate to our audio output sample rate. Dropping audio frame."));
return;
}
current_sample_rate = output_sample_rate;
current_sample_count = output_samples;
}
audio::apply_gain(temp_buffer, current_channel_count, current_sample_count, this->volume_);
assert(audio_output->sample_rate() == current_sample_rate);
assert(audio_output->channel_count() == current_channel_count);
audio_output->enqueue_samples(temp_buffer, current_sample_count);
this->set_state(state::playing);
}
std::shared_ptr<audio::SampleBuffer> VoiceClient::decode_buffer(const codec::value &audio_codec, const pipes::buffer_view &buffer, bool fec) {
assert(this->output_source);
auto& codec_data = this->codec[audio_codec];
if(codec_data.state != AudioCodec::State::INITIALIZED_SUCCESSFULLY) {
log_trace(category::audio, tr("Cant decode auto buffer of codec {} because codec isn't successfully initialized (state: {})"), audio_codec, (int) codec_data.state);
return nullptr;
}
string error;
char target_buffer[TEMP_BUFFER_LENGTH];
if(TEMP_BUFFER_LENGTH < codec_data.converter->expected_decoded_length(buffer.data_ptr(), buffer.length())) {
log_warn(category::voice_connection, tr("Failed to decode audio data. Target buffer is smaller then expected bytes ({} < {})"), TEMP_BUFFER_LENGTH, codec_data.converter->expected_decoded_length(buffer.data_ptr(), buffer.length()));
return nullptr;
}
auto samples = codec_data.converter->decode(error, buffer.data_ptr(), buffer.length(), target_buffer, fec);
if(samples < 0) {
log_warn(category::voice_connection, tr("Failed to decode audio data: {}"), error);
return nullptr;
}
if(!audio::merge::merge_channels_interleaved(target_buffer, this->output_source->channel_count(), target_buffer, codec_data.converter->channels(), samples)) {
log_warn(category::voice_connection, tr("Failed to merge channels to output stream channel count!"));
return nullptr;
}
auto estimated_output_samples = codec_data.resampler->estimated_output_size(samples);
if(TEMP_BUFFER_LENGTH < estimated_output_samples * this->output_source->channel_count() * sizeof(float)) {
log_warn(category::voice_connection, tr("Failed to resample audio data. Target buffer is smaller then expected bytes ({} < {})"), TEMP_BUFFER_LENGTH, (codec_data.resampler->estimated_output_size(samples) * this->output_source->channel_count() * 4));
return nullptr;
}
auto resampled_samples{estimated_output_samples};
if(!codec_data.resampler->process(target_buffer, target_buffer, samples, resampled_samples)) {
log_warn(category::voice_connection, tr("Failed to resample audio data."));
return nullptr;
}
if(!resampled_samples) {
/* we don't seem to have any output samples */
return nullptr;
}
audio::apply_gain(target_buffer, this->output_source->channel_count(), resampled_samples, this->volume_);
auto audio_buffer = audio::SampleBuffer::allocate((uint8_t) this->output_source->channel_count(), (uint16_t) resampled_samples);
audio_buffer->sample_index = 0;
memcpy(audio_buffer->sample_data, target_buffer, this->output_source->channel_count() * resampled_samples * 4);
return audio_buffer;
}
void VoiceClient::event_execute_dropped(const std::chrono::system_clock::time_point &point) { }
/*
* This method will be called within the audio event loop.
*/
@ -855,15 +805,13 @@ bool VoiceClient::handle_output_underflow(size_t sample_count) {
* Seems like we don't have any data for a bit longer already.
* Lets check if we timeout this stream.
*/
if(this->_last_received_packet + std::chrono::seconds{1} < std::chrono::system_clock::now()) {
if(this->packet_queue.last_packet_timestamp + std::chrono::seconds{1} < std::chrono::system_clock::now()) {
this->set_state(state::stopped);
log_warn(category::audio, tr("Clients {} audio stream timed out. We haven't received any audio packed within the last second. Stopping replay."), this->client_id_, sample_count);
break;
} else {
/*
* Lets wait until we have the next audio packet.
*/
break;
}
break;

View File

@ -4,6 +4,7 @@
#include <nan.h>
#include <include/NanEventCallback.h>
#include <functional>
#include <optional>
#include <pipes/buffer.h>
#include "../../audio/AudioResampler.h"
#include "../../audio/codec/Converter.h"
@ -16,47 +17,8 @@ namespace tc::connection {
class VoiceConnection;
class VoiceClient;
namespace codec {
enum value {
MIN = 0,
SPEEX_NARROWBAND = 0,
SPEEX_WIDEBAND = 1,
SPEEX_ULTRA_WIDEBAND = 2,
CELT_MONO = 3,
OPUS_VOICE = 4,
OPUS_MUSIC = 5,
MAX = 5,
};
struct CodecInfo {
bool supported;
std::string name;
std::function<std::shared_ptr<audio::codec::Converter>(std::string&)> new_converter;
};
extern const CodecInfo info[6];
inline const CodecInfo* get_info(value codec) {
if(codec > value::MAX || codec < value::MIN) {
return nullptr;
}
return &info[codec];
}
}
class VoiceClient : private event::EventEntry {
class VoiceClient : public event::EventEntry {
friend class VoiceConnection;
#ifdef WIN32
template<typename _Tp, typename _Up>
friend _NODISCARD std::shared_ptr<_Tp> std::static_pointer_cast(std::shared_ptr<_Up>&& _Other) noexcept;
#else
template<typename _Tp, typename _Up>
friend inline std::shared_ptr<_Tp> std::static_pointer_cast(const std::shared_ptr<_Up>& __r) noexcept;
#endif
public:
struct state {
enum value {
@ -90,7 +52,7 @@ namespace tc::connection {
inline std::shared_ptr<VoiceClient> ref() { return this->ref_.lock(); }
void process_packet(uint16_t packet_id, const pipes::buffer_view& /* buffer */, codec::value /* codec */, bool /* head */);
void process_packet(uint16_t packet_id, const pipes::buffer_view& /* buffer */, uint8_t /* payload codec id */, bool /* head */);
void execute_tick();
inline float get_volume() const { return this->volume_; }
@ -106,38 +68,20 @@ namespace tc::connection {
private:
struct EncodedBuffer {
bool head{false};
bool reset_decoder{false};
std::chrono::system_clock::time_point receive_timestamp;
pipes::buffer buffer;
codec::value codec{codec::MIN};
uint8_t codec{0xFF};
uint16_t packet_id{0};
EncodedBuffer* next{nullptr};
};
struct AudioCodec {
enum struct State {
UNINITIALIZED,
INITIALIZED_SUCCESSFULLY,
INITIALIZED_FAIL,
UNSUPPORTED,
};
codec::value codec{};
struct {
uint16_t last_packet_id{0xFFFF}; /* the first packet id is 0 so one packet before is 0xFFFF */
std::chrono::system_clock::time_point last_packet_timestamp{};
[[nodiscard]] inline std::chrono::system_clock::time_point stream_timeout() const {
return this->last_packet_timestamp + std::chrono::milliseconds{1000};
}
State state{State::UNINITIALIZED};
std::shared_ptr<audio::codec::Converter> converter{nullptr};
std::shared_ptr<audio::AudioResampler> resampler{nullptr};
std::mutex pending_lock{};
EncodedBuffer* pending_buffers{nullptr};
@ -145,10 +89,19 @@ namespace tc::connection {
EncodedBuffer* force_replay{nullptr};
bool process_pending{false};
};
std::array<AudioCodec, codec::MAX + 1> codec{};
void initialize_code(const codec::value& /* codec */);
[[nodiscard]] inline std::chrono::system_clock::time_point stream_timeout() const {
return this->last_packet_timestamp + std::chrono::milliseconds{1000};
}
} packet_queue;
struct {
/* the decoder has been initialized with fec data */
bool decoder_initialized{false};
audio::codec::AudioCodec current_codec{audio::codec::AudioCodec::Unknown};
std::unique_ptr<audio::codec::AudioDecoder> decoder{};
std::unique_ptr<audio::AudioResampler> resampler{};
} decoder;
/* might be null (if audio hasn't been initialized) */
std::shared_ptr<audio::AudioOutputSource> output_source{};
@ -159,7 +112,6 @@ namespace tc::connection {
uint16_t client_id_{0};
float volume_{1.f};
std::chrono::system_clock::time_point _last_received_packet{};
state::value state_{state::stopped};
inline void set_state(state::value value) {
if(value == this->state_) {
@ -177,12 +129,19 @@ namespace tc::connection {
void drop_enqueued_buffers();
void event_execute(const std::chrono::system_clock::time_point &point) override;
void event_execute_dropped(const std::chrono::system_clock::time_point &point) override;
bool handle_output_underflow(size_t sample_count);
/* its recommend to call this in correct packet oder */
std::shared_ptr<audio::SampleBuffer> decode_buffer(const codec::value& /* codec */,const pipes::buffer_view& /* buffer */, bool /* fec */);
/**
* Reset the decoder.
*/
void reset_decoder(bool /* deallocate */);
/**
* Decode and playback an audio packet.
* If fec decode is active we try to decode the fec data within the packet and playback them instead of the packet data.
* Note: If fec is set and the decoder hasn't been initialized we'll drop the buffer.
*/
void playback_audio_packet(uint8_t /* codec protocol id */, const void* /* buffer */, size_t /* buffer length */, bool /* use fec data */);
};

View File

@ -211,7 +211,8 @@ NAN_METHOD(VoiceConnectionWrap::get_encoder_codec) {
return;
}
info.GetReturnValue().Set(handle->get_encoder_codec());
auto codec = handle->get_encoder_codec();
info.GetReturnValue().Set(audio::codec::audio_codec_to_protocol_id(codec).value_or(-1));
}
NAN_METHOD(VoiceConnectionWrap::set_encoder_codec) {
@ -228,7 +229,12 @@ NAN_METHOD(VoiceConnectionWrap::set_encoder_codec) {
return;
}
handle->set_encoder_codec((uint8_t) info[0]->NumberValue(Nan::GetCurrentContext()).FromMaybe(0));
auto codec = audio::codec::audio_codec_from_protocol_id(info[0]->NumberValue(Nan::GetCurrentContext()).FromMaybe(-1));
if(!codec.has_value()) {
Nan::ThrowError("unknown codec id");
return;
}
handle->set_encoder_codec(*codec);
}
NAN_METHOD(VoiceConnectionWrap::enable_voice_send) {
@ -275,7 +281,7 @@ void VoiceConnectionWrap::release_recorder() {
VoiceConnection::VoiceConnection(ServerConnection *handle) : _handle(handle) {
this->_voice_sender = make_shared<VoiceSender>(this);
this->_voice_sender->_ref = this->_voice_sender;
this->_voice_sender->set_codec(codec::OPUS_MUSIC);
this->_voice_sender->set_codec(audio::codec::AudioCodec::OpusMusic);
}
VoiceConnection::~VoiceConnection() {
if(v8::Isolate::GetCurrent())
@ -369,24 +375,23 @@ void VoiceConnection::process_packet(const ts::protocol::PacketParser &packet) {
}
if(payload.length() > 5) {
client->process_packet(packet_id, payload.view(5), (codec::value) codec_id, flag_head);
client->process_packet(packet_id, payload.view(5), codec_id, flag_head);
} else {
client->process_packet(packet_id, pipes::buffer_view{nullptr, 0}, (codec::value) codec_id, flag_head);
client->process_packet(packet_id, pipes::buffer_view{nullptr, 0}, codec_id, flag_head);
}
} else {
//TODO implement whisper
}
}
void VoiceConnection::set_encoder_codec(const uint8_t &codec) {
if(codec > codec::MAX) return;
void VoiceConnection::set_encoder_codec(const audio::codec::AudioCodec &codec) {
auto vs = this->_voice_sender;
if(vs)
vs->set_codec((codec::value) codec);
if(vs) {
vs->set_codec(codec);
}
}
uint8_t VoiceConnection::get_encoder_codec() {
audio::codec::AudioCodec VoiceConnection::get_encoder_codec() {
auto vs = this->_voice_sender;
return vs ? vs->get_codec() : 0;
return vs ? vs->target_codec() : audio::codec::AudioCodec::Unknown;
}

View File

@ -6,6 +6,7 @@
#include <mutex>
#include <functional>
#include <protocol/Packet.h>
#include "../../audio/codec/Converter.h"
namespace tc {
namespace audio::recorder {
@ -88,8 +89,8 @@ namespace tc {
void process_packet(const ts::protocol::PacketParser&);
void set_encoder_codec(const uint8_t& /* codec */);
uint8_t get_encoder_codec();
void set_encoder_codec(const audio::codec::AudioCodec& /* target */);
audio::codec::AudioCodec get_encoder_codec();
private:
ServerConnection* _handle;
std::weak_ptr<VoiceConnection> _ref;