Teaspeak-Server/server/src/client/voice/VoiceClientConnection.cpp

704 lines
28 KiB
C++

#include <misc/endianness.h>
#include <algorithm>
#include <log/LogUtils.h>
#include "../../server/VoiceServer.h"
#include <misc/memtracker.h>
#include <protocol/Packet.h>
#include <ThreadPool/Timer.h>
#include "VoiceClientConnection.h"
#include "src/client/ConnectedClient.h"
#include "VoiceClient.h"
//#define LOG_AUTO_ACK_AUTORESPONSE
//#define FUZZING_TESTING_INCOMMING
//#define FUZZING_TESTING_OUTGOING
//#define FIZZING_TESTING_DISABLE_HANDSHAKE
#define FUZZING_TESTING_DROP 8
#define FUZZING_TESTING_DROP_MAX 10
//#define CONNECTION_NO_STATISTICS
#define QLZ_COMPRESSION_LEVEL 1
#include "qlz/QuickLZ.h"
using namespace std;
using namespace std::chrono;
using namespace ts;
using namespace ts::connection;
using namespace ts::protocol;
using namespace ts::server;
VoiceClientConnection::VoiceClientConnection(VoiceClient* client) : client(client) {
memtrack::allocated<VoiceClientConnection>(this);
this->crypt_handler.reset();
debugMessage(client->getServer()->getServerId(), "Allocated new voice client connection at {}", (void*) this);
}
VoiceClientConnection::~VoiceClientConnection() {
/* locking here should be useless, but just to ensure! */
{
lock_guard write_queue_lock(this->write_queue_lock);
this->write_queue.clear();
}
for(auto& category : this->write_preprocess_queues) {
lock_guard work_lock{category.work_lock};
lock_guard queue_lock{category.queue_lock};
category.queue.clear();
}
this->client = nullptr;
memtrack::freed<VoiceClientConnection>(this);
}
void VoiceClientConnection::triggerWrite() {
if(this->client->voice_server)
this->client->voice_server->triggerWrite(dynamic_pointer_cast<VoiceClient>(this->client->_this.lock()));
}
#ifdef CLIENT_LOG_PREFIX
#undef CLIENT_LOG_PREFIX
#endif
#define CLIENT_LOG_PREFIX "[" << this->client->getPeerIp() << ":" << this->client->getPeerPort() << " | " << this->client->getDisplayName() << "]"
//Message handle methods
void VoiceClientConnection::handle_incoming_datagram(const pipes::buffer_view& buffer) {
#ifdef FUZZING_TESTING_INCOMMING
#ifdef FIZZING_TESTING_DISABLE_HANDSHAKE
if (this->client->state == ConnectionState::CONNECTED) {
#endif
if ((rand() % FUZZING_TESTING_DROP_MAX) < FUZZING_TESTING_DROP) {
debugMessage(this->client->getServerId(), "{}[FUZZING] Dropping incoming packet of length {}", CLIENT_STR_LOG_PREFIX_(this->client), buffer.length());
return;
}
#ifdef FIZZING_TESTING_DISABLE_HANDSHAKE
}
#endif
#endif
ClientPacketParser packet_parser{buffer};
if(!packet_parser.valid()) {
logTrace(this->client->getServerId(), "{} Received invalid packet. Dropping.", CLIENT_STR_LOG_PREFIX_(this->client));
return;
}
assert(packet_parser.type() >= 0 && packet_parser.type() < this->incoming_generation_estimators.size());
packet_parser.set_estimated_generation(this->incoming_generation_estimators[packet_parser.type()].visit_packet(packet_parser.packet_id()));
#ifndef CONNECTION_NO_STATISTICS
if(this->client) {
auto stats = this->client->connectionStatistics;
stats->logIncomingPacket(stats::ConnectionStatistics::category::from_type(packet_parser.type()), buffer.length() + 96); /* 96 for the UDP packet overhead */
}
this->packet_statistics().received_packet((protocol::PacketType) packet_parser.type(), packet_parser.full_packet_id());
#endif
auto is_command = packet_parser.type() == protocol::COMMAND || packet_parser.type() == protocol::COMMAND_LOW;
/* pretest if the packet is worth the effort of decoding it */
if(is_command) {
/* handle the order stuff */
auto& fragment_buffer = this->_command_fragment_buffers[command_fragment_buffer_index(packet_parser.type())];
unique_lock queue_lock(fragment_buffer.buffer_lock);
auto result = fragment_buffer.accept_index(packet_parser.packet_id());
if(result != 0) { /* packet index is ahead buffer index */
debugMessage(this->client->getServerId(), "{} Dropping command packet because command assembly buffer has an {} ({}|{}|{})",
CLIENT_STR_LOG_PREFIX_(this->client),
result == -1 ? "underflow" : "overflow",
fragment_buffer.capacity(),
fragment_buffer.current_index(),
packet_parser.packet_id()
);
if(result == -1) { /* underflow */
/* we've already got the packet, but the client dosn't know that so we've to send the acknowledge again */
if(this->client->crypto.protocol_encrypted)
this->client->sendAcknowledge(packet_parser.packet_id(), packet_parser.type() == protocol::COMMAND_LOW);
}
return;
}
}
//NOTICE I found out that the Compressed flag is set if the packet contains an audio header
if(this->client->state == ConnectionState::INIT_LOW && packet_parser.type() != protocol::INIT1)
return;
/* decrypt the packet if needed */
if(packet_parser.is_encrypted()) {
std::string error;
CryptHandler::key_t crypt_key{};
CryptHandler::nonce_t crypt_nonce{};
auto data = (uint8_t*) packet_parser.mutable_data_ptr();
bool use_default_key{!this->client->crypto.protocol_encrypted}, decrypt_result;
decrypt_packet:
if(use_default_key) {
crypt_key = CryptHandler::default_key;
crypt_nonce = CryptHandler::default_nonce;
} else {
if(!this->crypt_handler.generate_key_nonce(true, packet_parser.type(), packet_parser.packet_id(), packet_parser.estimated_generation(), crypt_key, crypt_nonce)) {
logError(this->client->getServerId(), "{} Failed to generate crypt key/nonce. This should never happen! Dropping packet.", CLIENT_STR_LOG_PREFIX_(this->client));
return;
}
}
decrypt_result = this->crypt_handler.decrypt(
data + ClientPacketParser::kHeaderOffset, ClientPacketParser::kHeaderLength,
data + ClientPacketParser::kPayloadOffset, packet_parser.payload_length(),
data,
crypt_key, crypt_nonce,
error
);
if(!decrypt_result) {
if(!this->client->crypto.client_init) {
if(use_default_key) {
logTrace(this->client->getServerId(), "{} Failed to decrypt packet with default key ({}). Dropping packet.", CLIENT_STR_LOG_PREFIX_(this->client), error);
return;
} else {
logTrace(this->client->getServerId(), "{} Failed to decrypt packet ({}). Trying with default key.", CLIENT_STR_LOG_PREFIX_(this->client), error);
use_default_key = true;
goto decrypt_packet;
}
} else {
logTrace(this->client->getServerId(), "{} Failed to decrypt packet ({}). Dropping packet.", CLIENT_STR_LOG_PREFIX_(this->client), error);
return;
}
}
packet_parser.set_decrypted();
} else if(is_command && this->client->state != ConnectionState::INIT_HIGH) {
logTrace(this->client->getServerId(), "{} Voice client {}/{} tried to send a unencrypted command packet. Dropping packet.", CLIENT_STR_LOG_PREFIX_(this->client), client->getDisplayName(), this->client->getLoggingPeerIp());
return;
}
#ifdef LOG_INCOMPING_PACKET_FRAGMENTS
debugMessage(lstream << CLIENT_LOG_PREFIX << "Recived packet. PacketId: " << packet->packetId() << " PacketType: " << packet->type().name() << " Flags: " << packet->flags() << " - " << packet->data() << endl);
#endif
if(is_command) {
auto& fragment_buffer = this->_command_fragment_buffers[command_fragment_buffer_index(packet_parser.type())];
CommandFragment fragment_entry{
packet_parser.packet_id(),
packet_parser.estimated_generation(),
packet_parser.flags(),
(uint32_t) packet_parser.payload_length(),
packet_parser.payload().own_buffer()
};
{
unique_lock queue_lock(fragment_buffer.buffer_lock);
if(!fragment_buffer.insert_index(packet_parser.packet_id(), std::move(fragment_entry))) {
logTrace(this->client->getServerId(), "{} Failed to insert command packet into command packet buffer.", CLIENT_STR_LOG_PREFIX_(this->client));
return;
}
}
this->client->sendAcknowledge(packet_parser.packet_id(), packet_parser.type() == protocol::COMMAND_LOW);
auto voice_server = this->client->voice_server;
if(voice_server)
voice_server->schedule_command_handling(this->client);
} else {
if(packet_parser.type() == protocol::VOICE || packet_parser.type() == protocol::VOICE_WHISPER)
this->client->handlePacketVoice(packet_parser);
else if(packet_parser.type() == protocol::ACK || packet_parser.type() == protocol::ACK_LOW)
this->client->handlePacketAck(packet_parser);
else if(packet_parser.type() == protocol::PING || packet_parser.type() == protocol::PONG)
this->client->handlePacketPing(packet_parser);
else {
logError(this->client->getServerId(), "{} Received hand decoded packet, but we've no method to handle it. Dropping packet.", CLIENT_STR_LOG_PREFIX_(this->client));
}
}
}
bool VoiceClientConnection::verify_encryption(const pipes::buffer_view &buffer /* incl. mac etc */) {
ClientPacketParser packet_parser{buffer};
if(!packet_parser.valid() || !packet_parser.is_encrypted()) return false;
assert(packet_parser.type() >= 0 && packet_parser.type() < this->incoming_generation_estimators.size());
return this->crypt_handler.verify_encryption(buffer, packet_parser.packet_id(), this->incoming_generation_estimators[packet_parser.type()].generation());
}
void VoiceClientConnection::execute_handle_command_packets(const std::chrono::system_clock::time_point& /* scheduled */) {
if(this->client->state >= ConnectionState::DISCONNECTING || !this->client->getServer())
return;
//TODO: Remove the buffer_execute_lock and use the one within the this->client->handlePacketCommand method
unique_lock<std::recursive_timed_mutex> buffer_execute_lock;
pipes::buffer payload{};
uint16_t packet_id{};
auto reexecute_handle = this->next_reassembled_command(buffer_execute_lock, payload, packet_id);
if(!payload.empty()){
auto startTime = system_clock::now();
try {
this->client->handlePacketCommand(payload);
} catch (std::exception& ex) {
logCritical(this->client->getServerId(), "{} Exception reached root tree! {}", CLIENT_STR_LOG_PREFIX_(this->client), ex.what());
}
auto end = system_clock::now();
if(end - startTime > milliseconds(10)) {
logError(this->client->getServerId(),
"{} Handling of command packet needs more than 10ms ({}ms)",
CLIENT_STR_LOG_PREFIX_(this->client),
duration_cast<milliseconds>(end - startTime).count()
);
}
}
if(buffer_execute_lock.owns_lock())
buffer_execute_lock.unlock();
auto voice_server = this->client->voice_server;
if(voice_server && (reexecute_handle || this->should_reassembled_reschedule)) {
should_reassembled_reschedule = false;
this->client->voice_server->schedule_command_handling(this->client);
}
}
/* buffer_execute_lock: lock for in order execution */
bool VoiceClientConnection::next_reassembled_command(unique_lock<std::recursive_timed_mutex>& buffer_execute_lock, pipes::buffer& result, uint16_t& packet_id) {
command_fragment_buffer_t* buffer{nullptr};
unique_lock<std::recursive_timed_mutex> buffer_lock; /* general buffer lock */
bool have_more{false};
{
//FIXME: Currently command low packets cant be handeled if there is a command packet stuck in reassamble
/* handle commands before command low packets */
for(auto& buf : this->_command_fragment_buffers) {
unique_lock ring_lock(buf.buffer_lock, try_to_lock); //Perm lock the buffer else, may command wount get handeled. Because we've more left, but say we waven't
if(!ring_lock.owns_lock()) {
this->should_reassembled_reschedule = true;
continue;
}
if(buf.front_set()) {
if(!buffer) { /* lets still test for reexecute */
buffer_execute_lock = unique_lock(buf.execute_lock, try_to_lock);
if(!buffer_execute_lock.owns_lock()) {
this->should_reassembled_reschedule = true;
continue;
}
buffer_lock = move(ring_lock);
buffer = &buf;
} else {
have_more = true;
break;
}
}
}
}
if(!buffer)
return false; /* we've no packets */
uint8_t packet_flags{0};
pipes::buffer payload{};
/* lets find out if we've to reassemble the packet */
auto& first_buffer = buffer->slot_value(0);
packet_id = first_buffer.packet_id;
if(first_buffer.packet_flags & PacketFlag::Fragmented) {
uint16_t sequence_length{1};
size_t total_payload_length{first_buffer.payload_length};
do {
if(sequence_length >= buffer->capacity()) {
logError(this->client->getServerId(), "{} Command fragment buffer is full, and there is not fragmented packet end. Dropping full buffer which will probably cause a connection loss.", CLIENT_STR_LOG_PREFIX_(this->client));
buffer->clear();
return false; /* we've nothing to handle */
}
if(!buffer->slot_set(sequence_length))
return false; /* we need more packets */
auto& packet = buffer->slot_value(sequence_length++);
total_payload_length += packet.payload_length;
if(packet.packet_flags & PacketFlag::Fragmented) {
/* yep we find the end */
break;
}
} while(true);
/* ok we have all fragments lets reassemble */
/*
* Packet sequence could never be so long. If it is so then the data_length() returned an invalid value.
* We're checking it here because we dont want to make a huge allocation
*/
assert(total_payload_length < 512 * 1024 * 1024);
pipes::buffer packet_buffer{total_payload_length};
char* packet_buffer_ptr = &packet_buffer[0];
size_t packet_count{0};
packet_flags = buffer->slot_value(0).packet_flags;
while(packet_count < sequence_length) {
auto fragment = buffer->pop_front();
memcpy(packet_buffer_ptr, fragment.payload.data_ptr(), fragment.payload_length);
packet_buffer_ptr += fragment.payload_length;
packet_count++;
}
#ifndef _NDEBUG
if((packet_buffer_ptr - 1) != &packet_buffer[packet_buffer.length() - 1]) {
logCritical(this->client->getServer()->getServerId(),
"Buffer over/underflow: packet_buffer_ptr != &packet_buffer[packet_buffer.length() - 1]; packet_buffer_ptr := {}; packet_buffer.end() := {}",
(void*) packet_buffer_ptr,
(void*) &packet_buffer[packet_buffer.length() - 1]
);
}
#endif
payload = packet_buffer;
} else {
auto packet = buffer->pop_front();
packet_flags = packet.packet_flags;
payload = packet.payload;
}
have_more |= buffer->front_set(); /* set the more flag if we have more to process */
buffer_lock.unlock();
if(packet_flags & PacketFlag::Compressed) {
std::string error{};
auto decompressed_size = compression::qlz_decompressed_size(payload.data_ptr(), payload.length());
if(decompressed_size == 0) {
logTrace(this->client->getServerId(), "{} Failed to calculate decompressed size for received command. Dropping packet.", CLIENT_STR_LOG_PREFIX_(this->client));
return false;
} else if(decompressed_size > 20 * 1024 * 1024) { /* max 20MB */
logTrace(this->client->getServerId(), "{} Command packet has a too large compressed size. Dropping packet.", CLIENT_STR_LOG_PREFIX_(this->client));
return false;
}
auto buffer = buffer::allocate_buffer(decompressed_size);
if(!compression::qlz_decompress_payload(payload.data_ptr(), buffer.data_ptr(), &decompressed_size)) {
logTrace(this->client->getServerId(), "{} Failed to decompress received command. Dropping packet.", CLIENT_STR_LOG_PREFIX_(this->client));
return false;
}
payload = buffer.range(0, decompressed_size);
}
result = std::move(payload);
return have_more;
}
void VoiceClientConnection::sendPacket(const shared_ptr<protocol::ServerPacket>& original_packet, bool copy, bool prepare_directly) {
if(this->client->state == ConnectionState::DISCONNECTED)
return;
shared_ptr<protocol::ServerPacket> packet;
if(copy) {
packet = protocol::ServerPacket::from_buffer(original_packet->buffer().dup(buffer::allocate_buffer(original_packet->buffer().length())));
if(original_packet->getListener())
packet->setListener(std::move(original_packet->getListener()));
packet->memory_state.flags = original_packet->memory_state.flags;
} else {
packet = original_packet;
}
auto type = WritePreprocessCategory::from_type(packet->type().type());
auto& queue = this->write_preprocess_queues[type];
if(prepare_directly) {
vector<pipes::buffer> buffers;
this->prepare_process_count++;
{
unique_lock work_lock{queue.work_lock};
if(!this->prepare_packet_for_write(buffers, packet, work_lock)) {
logError(this->client->getServerId(), "{} Dropping packet!", CLIENT_STR_LOG_PREFIX_(this->client));
this->prepare_process_count--;
return;
}
}
/* enqueue buffers for write */
{
lock_guard write_queue_lock(this->write_queue_lock);
this->write_queue.insert(this->write_queue.end(), buffers.begin(), buffers.end());
}
this->prepare_process_count--; /* we're now done preparing */
} else {
lock_guard queue_lock{queue.queue_lock};
queue.queue.push_back(packet);
queue.has_work = true;
}
this->triggerWrite();
}
bool VoiceClientConnection::prepare_packet_for_write(vector<pipes::buffer> &result, const shared_ptr<ServerPacket> &packet, std::unique_lock<std::mutex>& work_lock) {
assert(work_lock.owns_lock());
string error = "success";
if(packet->type().compressable() && !packet->memory_state.fragment_entry) {
packet->enable_flag(PacketFlag::Compressed);
if(!this->compress_handler.progressPacketOut(packet.get(), error)) {
logError(this->getClient()->getServerId(), "{} Could not compress outgoing packet.\nThis could cause fatal failed for the client.\nError: {}", error);
return false;
}
}
std::vector<shared_ptr<ServerPacket>> fragments;
fragments.reserve((size_t) (packet->data().length() / packet->type().max_length()) + 1);
if(packet->data().length() > packet->type().max_length()) {
if(!packet->type().fragmentable()) {
logError(this->client->getServerId(), "{} We've tried to send a too long, not fragmentable, packet. Dropping packet of type {} with length {}", CLIENT_STR_LOG_PREFIX_(this->client), packet->type().name(), packet->data().length());
return false;
}
{ //Split packets
auto buffer = packet->data();
const auto max_length = packet->type().max_length();
while(buffer.length() > max_length * 2) {
fragments.push_back(make_shared<ServerPacket>(packet->type(), buffer.view(0, max_length).dup(buffer::allocate_buffer(max_length))));
buffer = buffer.range((size_t) max_length);
}
if(buffer.length() > max_length) { //Divide rest by 2
fragments.push_back(make_shared<ServerPacket>(packet->type(), buffer.view(0, buffer.length() / 2).dup(buffer::allocate_buffer(buffer.length() / 2))));
buffer = buffer.range(buffer.length() / 2);
}
fragments.push_back(make_shared<ServerPacket>(packet->type(), buffer));
for(const auto& frag : fragments) {
frag->setFragmentedEntry(true);
frag->enable_flag(PacketFlag::NewProtocol);
}
}
assert(fragments.size() >= 2);
fragments.front()->enable_flag(PacketFlag::Fragmented);
if(packet->has_flag(PacketFlag::Compressed))
fragments.front()->enable_flag(PacketFlag::Compressed);
fragments.back()->enable_flag(PacketFlag::Fragmented);
if(packet->getListener())
fragments.back()->setListener(std::move(packet->getListener())); //Move the listener to the last :)
} else {
fragments.push_back(packet);
}
result.reserve(fragments.size());
/* apply packet ids */
for(const auto& fragment : fragments) {
if(!fragment->memory_state.id_branded)
fragment->applyPacketId(this->packet_id_manager);
if(fragment->type().type() == protocol::PacketType::COMMAND_LOW || fragment->type().type() == protocol::PacketType::COMMAND)
this->packet_statistics().send_command(fragment->type().type(), fragment->packetId() | fragment->generationId() << 16U);
}
work_lock.unlock(); /* the rest could be unordered */
CryptHandler::key_t crypt_key{};
CryptHandler::nonce_t crypt_nonce{};
auto statistics = this->client ? this->client->connectionStatistics : nullptr;
for(const auto& fragment : fragments) {
if(fragment->has_flag(PacketFlag::Unencrypted)) {
this->crypt_handler.write_default_mac(fragment->mac().data_ptr());
} else {
if(!this->client->crypto.protocol_encrypted) {
crypt_key = CryptHandler::default_key;
crypt_nonce = CryptHandler::default_nonce;
} else {
if(!this->crypt_handler.generate_key_nonce(false, fragment->type().type(), fragment->packetId(), fragment->generationId(), crypt_key, crypt_nonce)) {
logError(this->client->getServerId(), "{} Failed to generate crypt key/nonce for sending a packet. This should never happen! Dropping packet.", CLIENT_STR_LOG_PREFIX_(this->client));
return false;
}
}
auto crypt_result = this->crypt_handler.encrypt(fragment->header().data_ptr(), fragment->header().length(),
fragment->data().data_ptr(), fragment->data().length(),
fragment->mac().data_ptr(),
crypt_key, crypt_nonce, error);
if(!crypt_result){
logError(this->client->getServerId(), "{} Failed to encrypt packet. Error: {}", CLIENT_STR_LOG_PREFIX_(this->client), error);
return false;
}
}
#ifndef CONNECTION_NO_STATISTICS
if(statistics) {
auto category = stats::ConnectionStatistics::category::from_type(fragment->type());
statistics->logOutgoingPacket(category, fragment->length() + 96); /* 96 for the UDP packet overhead */
}
#endif
this->acknowledge_handler.process_packet(*fragment);
result.push_back(fragment->buffer());
}
return true;
}
bool VoiceClientConnection::preprocess_write_packets() {
std::shared_ptr<ServerPacket> packet{nullptr};
vector<pipes::buffer> buffers{};
bool flag_more{false};
prepare_process_count++; /* we're not preparing a packet */
for(auto& category : this->write_preprocess_queues) {
if(!category.has_work) continue;
else if(packet) {
flag_more = true;
break;
}
unique_lock work_lock{category.work_lock, try_to_lock};
if(!work_lock) continue; /* This particular category will already be processed */
{
lock_guard buffer_lock{category.queue_lock};
if(category.queue.empty()) {
category.has_work = false;
continue;
}
packet = std::move(category.queue.front());
category.queue.pop_front();
category.has_work = !category.queue.empty();
flag_more = category.has_work;
}
if(!this->prepare_packet_for_write(buffers, packet, work_lock)) {
logError(this->client->getServerId(), "{} Dropping packet!", CLIENT_STR_LOG_PREFIX_(this->client));
if(flag_more)
break;
else
continue; /* find out if we have more */
}
if(flag_more)
break;
else
continue; /* find out if we have more */
}
/* enqueue buffers for write */
if(!buffers.empty()) {
lock_guard write_queue_lock(this->write_queue_lock);
this->write_queue.insert(this->write_queue.end(), buffers.begin(), buffers.end());
}
this->prepare_process_count--; /* we're now done preparing */
return flag_more;
}
int VoiceClientConnection::pop_write_buffer(pipes::buffer& target) {
if(this->client->state == DISCONNECTED)
return 2;
lock_guard wqlock{this->write_queue_lock};
size_t size = this->write_queue.size();
if(size == 0)
return 2;
target = std::move(this->write_queue.front());
this->write_queue.pop_front();
#ifdef FUZZING_TESTING_OUTGOING
#ifdef FIZZING_TESTING_DISABLE_HANDSHAKE
if (this->client->state == ConnectionState::CONNECTED) {
#endif
if ((rand() % FUZZING_TESTING_DROP_MAX) < FUZZING_TESTING_DROP) {
debugMessage(this->client->getServerId(), "{}[FUZZING] Dropping outgoing packet", CLIENT_STR_LOG_PREFIX_(this->client));
return 0;
}
#ifdef FIZZING_TESTING_DISABLE_HANDSHAKE
}
#endif
#endif
return size > 1;
}
bool VoiceClientConnection::wait_empty_write_and_prepare_queue(chrono::time_point<chrono::system_clock> until) {
while(true) {
for(auto& queue : this->write_preprocess_queues) {
{
lock_guard lock{queue.queue_lock};
if(!queue.queue.empty())
goto _wait;
}
{
unique_lock lock{queue.work_lock, try_to_lock};
if(!lock.owns_lock())
goto _wait;
}
}
{
lock_guard buffer_lock{this->write_queue_lock};
if(!this->write_queue.empty())
goto _wait;
if(this->prepare_process_count != 0)
goto _wait;
}
break;
_wait:
if(until.time_since_epoch().count() != 0 && system_clock::now() > until)
return false;
threads::self::sleep_for(milliseconds(5));
}
return true;
}
void VoiceClientConnection::reset() {
for(auto& queue : this->write_preprocess_queues) {
{
lock_guard lock{queue.queue_lock};
queue.queue.clear();
}
}
this->acknowledge_handler.reset();
this->crypt_handler.reset();
this->packet_id_manager.reset();
{
lock_guard buffer_lock(this->packet_buffer_lock);
for(auto& buffer : this->_command_fragment_buffers)
buffer.reset();
}
}
void VoiceClientConnection::force_insert_command(const pipes::buffer_view &buffer) {
CommandFragment fragment_entry{
0,
0,
PacketFlag::Unencrypted,
(uint32_t) buffer.length(),
buffer.own_buffer()
};
{
auto& fragment_buffer = this->_command_fragment_buffers[command_fragment_buffer_index(protocol::COMMAND)];
unique_lock queue_lock(fragment_buffer.buffer_lock);
fragment_buffer.push_front(std::move(fragment_entry));
}
auto voice_server = this->client->voice_server;
if(voice_server)
voice_server->schedule_command_handling(this->client);
}
void VoiceClientConnection::register_initiv_packet() {
auto& fragment_buffer = this->_command_fragment_buffers[command_fragment_buffer_index(protocol::COMMAND)];
unique_lock buffer_lock(fragment_buffer.buffer_lock);
fragment_buffer.set_full_index_to(1); /* the first packet (0) is already the clientinitiv packet */
}