reworked the voice client connection part 1

This commit is contained in:
WolverinDEV 2020-07-29 19:05:38 +02:00
parent ad51f8118d
commit 4c91a7a3bf
23 changed files with 1162 additions and 759 deletions

View File

@ -43,7 +43,7 @@ set(SERVER_SOURCE_FILES
# MySQLLibSSLFix.c
src/client/ConnectedClient.cpp
src/client/voice/PrecomputedPuzzles.cpp
src/server/PrecomputedPuzzles.cpp
src/client/voice/VoiceClient.cpp
src/client/voice/VoiceClientHandschake.cpp
src/client/voice/VoiceClientCommandHandler.cpp
@ -146,7 +146,12 @@ set(SERVER_SOURCE_FILES
src/manager/ConversationManager.cpp
src/client/SpeakingClientHandshake.cpp
src/client/command_handler/music.cpp src/client/command_handler/file.cpp)
src/client/command_handler/music.cpp src/client/command_handler/file.cpp
src/client/voice/PacketDecoder.cpp
src/client/voice/PacketEncoder.cpp
)
if (COMPILE_WEB_CLIENT)
add_definitions(-DCOMPILE_WEB_CLIENT)

View File

@ -2,7 +2,7 @@
#include <deque>
#include <EventLoop.h>
#include "client/voice/PrecomputedPuzzles.h"
#include "src/server/PrecomputedPuzzles.h"
#include "server/VoiceIOManager.h"
#include "VirtualServer.h"
#include <query/command3.h>

View File

@ -639,19 +639,19 @@ bool ConnectedClient::handle_text_command(
protocol::PacketTypeInfo::Ping,
protocol::PacketTypeInfo::Pong}) {
auto id = vc->getConnection()->getPacketIdManager().currentPacketId(type);
auto gen = vc->getConnection()->getPacketIdManager().generationId(type);
auto& genestis = vc->getConnection()->get_incoming_generation_estimators();
//auto id = vc->getConnection()->getPacketIdManager().currentPacketId(type);
//auto gen = vc->getConnection()->getPacketIdManager().generationId(type);
//auto& genestis = vc->getConnection()->get_incoming_generation_estimators();
send_message(_this.lock(), " OUT " + type.name() + " => generation: " + to_string(gen) + " id: " + to_string(id));
send_message(_this.lock(), " IN " + type.name() + " => generation: " + to_string(genestis[type.type()].generation()) + " id: " + to_string(genestis[type.type()].current_packet_id()));
//send_message(_this.lock(), " OUT " + type.name() + " => generation: " + to_string(gen) + " id: " + to_string(id));
//send_message(_this.lock(), " IN " + type.name() + " => generation: " + to_string(genestis[type.type()].generation()) + " id: " + to_string(genestis[type.type()].current_packet_id()));
}
return true;
} else if(TARG(0, "ping")) {
auto vc = dynamic_pointer_cast<VoiceClient>(_this.lock());
if(!vc) return false;
auto& ack = vc->connection->getAcknowledgeManager();
auto& ack = vc->connection->packet_encoder().acknowledge_manager();
send_message(_this.lock(), "Command retransmission values:");
send_message(_this.lock(), " RTO : " + std::to_string(ack.current_rto()));
send_message(_this.lock(), " RTTVAR: " + std::to_string(ack.current_rttvar()));
@ -668,12 +668,14 @@ bool ConnectedClient::handle_text_command(
auto vc = dynamic_pointer_cast<VoiceClient>(_this.lock());
if(!vc) return false;
/*
auto& genestis = vc->getConnection()->get_incoming_generation_estimators();
if(type >= genestis.size()) {
send_message(_this.lock(), "Invalid type");
return true;
}
genestis[type].set_last_state(pid, generation);
*/
} catch(std::exception& ex) {
send_message(_this.lock(), "Failed to parse argument");
return true;

View File

@ -2,24 +2,23 @@
#include "ConnectedClient.h"
namespace ts {
namespace server {
class InternalClient : public ConnectedClient {
public:
InternalClient(sql::SqlManager*, const std::shared_ptr<server::VirtualServer>&, std::string, bool);
~InternalClient();
namespace ts::server {
class VirtualServer;
class InternalClient : public ConnectedClient {
public:
InternalClient(sql::SqlManager*, const std::shared_ptr<VirtualServer>&, std::string, bool);
~InternalClient();
void setSharedLock(const std::shared_ptr<ConnectedClient>& _this){
assert(_this.get() == this);
this->_this = _this;
}
void setSharedLock(const std::shared_ptr<ConnectedClient>& _this){
assert(_this.get() == this);
this->_this = _this;
}
void sendCommand(const ts::Command &command, bool low) override;
void sendCommand(const ts::command_builder &command, bool low) override;
bool close_connection(const std::chrono::system_clock::time_point& timeout = std::chrono::system_clock::time_point()) override;
bool disconnect(const std::string &reason) override;
protected:
void tick(const std::chrono::system_clock::time_point &time) override;
};
}
void sendCommand(const ts::Command &command, bool low) override;
void sendCommand(const ts::command_builder &command, bool low) override;
bool close_connection(const std::chrono::system_clock::time_point& timeout = std::chrono::system_clock::time_point()) override;
bool disconnect(const std::string &reason) override;
protected:
void tick(const std::chrono::system_clock::time_point &time) override;
};
}

View File

@ -272,7 +272,9 @@ command_result ConnectedClient::handleCommand(Command &cmd) {
else if (command == "listfeaturesupport") return this->handleCommandListFeatureSupport(cmd);
if (this->getType() == ClientType::CLIENT_QUERY) return command_result{error::command_not_found}; //Dont log query invalid commands
if (this->getType() == ClientType::CLIENT_QUERY)
return command_result{error::command_not_found}; //Dont log query invalid commands
if (this->getType() == ClientType::CLIENT_TEAMSPEAK)
if (command.empty() || command.find_first_not_of(' ') == -1) {
if (!permission::v2::permission_granted(1, this->calculate_permission(permission::b_client_allow_invalid_packet, this->getChannelId())))

View File

@ -101,7 +101,7 @@ bool MusicClient::disconnect(const std::string &reason) {
return true;
}
bool server::MusicClient::notifyClientMoved(
bool MusicClient::notifyClientMoved(
const std::shared_ptr<ConnectedClient> &client,
const std::shared_ptr<BasicChannel> &target_channel,
ViewReasonId reason,

View File

@ -0,0 +1,307 @@
//
// Created by WolverinDEV on 10/03/2020.
//
#include "PacketDecoder.h"
#include <protocol/buffers.h>
#include <protocol/AcknowledgeManager.h>
#include <protocol/CompressionHandler.h>
#include <protocol/CryptHandler.h>
#include "../../ConnectionStatistics.h"
using namespace ts;
using namespace ts::protocol;
using namespace ts::connection;
using namespace ts::server::server::udp;
ReassembledCommand *ReassembledCommand::allocate(size_t size) {
auto instance = (ReassembledCommand*) malloc(sizeof(ReassembledCommand) + size);
instance->length_ = size;
instance->capacity_ = size;
instance->next_command = nullptr;
return instance;
}
void ReassembledCommand::free(ReassembledCommand *command) {
::free(command);
}
PacketDecoder::PacketDecoder(ts::connection::CryptHandler *crypt_handler)
: crypt_handler_{crypt_handler} {
memtrack::allocated<PacketDecoder>(this);
}
PacketDecoder::~PacketDecoder() {
memtrack::freed<PacketDecoder>(this);
this->reset();
}
void PacketDecoder::reset() {
std::lock_guard buffer_lock(this->packet_buffer_lock);
for(auto& buffer : this->_command_fragment_buffers)
buffer.reset();
}
PacketProcessResult PacketDecoder::process_incoming_data(ClientPacketParser &packet_parser, std::string& error) {
#ifdef FUZZING_TESTING_INCOMMING
if(rand() % 100 < 20)
return PacketProcessResult::FUZZ_DROPPED;
#ifdef FIZZING_TESTING_DISABLE_HANDSHAKE
if (this->client->state == ConnectionState::CONNECTED) {
#endif
if ((rand() % FUZZING_TESTING_DROP_MAX) < FUZZING_TESTING_DROP) {
debugMessage(this->client->getServerId(), "{}[FUZZING] Dropping incoming packet of length {}", CLIENT_STR_LOG_PREFIX_(this->client), buffer.length());
return;
}
#ifdef FIZZING_TESTING_DISABLE_HANDSHAKE
}
#endif
#endif
auto result = this->decode_incoming_packet(error, packet_parser);
if(result != PacketProcessResult::SUCCESS)
return result;
#ifdef LOG_INCOMPING_PACKET_FRAGMENTS
debugMessage(lstream << CLIENT_LOG_PREFIX << "Recived packet. PacketId: " << packet->packetId() << " PacketType: " << packet->type().name() << " Flags: " << packet->flags() << " - " << packet->data() << endl);
#endif
auto is_command = packet_parser.type() == protocol::COMMAND || packet_parser.type() == protocol::COMMAND_LOW;
if(is_command) {
auto& fragment_buffer = this->_command_fragment_buffers[command_fragment_buffer_index(packet_parser.type())];
CommandFragment fragment_entry{
packet_parser.packet_id(),
packet_parser.estimated_generation(),
packet_parser.flags(),
(uint32_t) packet_parser.payload_length(),
packet_parser.payload().own_buffer()
};
std::unique_lock queue_lock(fragment_buffer.buffer_lock);
auto insert_result = fragment_buffer.insert_index2(packet_parser.full_packet_id(), std::move(fragment_entry));
if(insert_result != 0) {
queue_lock.unlock();
error = "pid: " + std::to_string(packet_parser.packet_id()) + ", ";
error += "bidx: " + std::to_string(fragment_buffer.current_index()) + ", ";
error += "bcap: " + std::to_string(fragment_buffer.capacity());
if(insert_result == -2) {
return PacketProcessResult::DUPLICATED_PACKET;
} else if(insert_result == -1) {
this->callback_send_acknowledge(this->callback_argument, packet_parser.packet_id(), packet_parser.type() == protocol::COMMAND_LOW);
return PacketProcessResult::BUFFER_UNDERFLOW;
} else if(insert_result == 1) {
return PacketProcessResult::BUFFER_OVERFLOW;
}
assert(false);
return PacketProcessResult::UNKNOWN_ERROR;
}
this->callback_send_acknowledge(this->callback_argument, packet_parser.packet_id(), packet_parser.type() == protocol::COMMAND_LOW);
ReassembledCommand* command{nullptr};
CommandReassembleResult assemble_result;
do {
if(!queue_lock.owns_lock())
queue_lock.lock();
assemble_result = this->try_reassemble_ordered_packet(fragment_buffer, queue_lock, command);
if(assemble_result == CommandReassembleResult::SUCCESS || assemble_result == CommandReassembleResult::MORE_COMMANDS_PENDING)
this->callback_decoded_command(this->callback_argument, command);
if(command) {
/* ownership hasn't transferred */
ReassembledCommand::free(command);
command = nullptr;
}
switch (assemble_result) {
case CommandReassembleResult::NO_COMMANDS_PENDING:
case CommandReassembleResult::SUCCESS:
case CommandReassembleResult::MORE_COMMANDS_PENDING:
break;
case CommandReassembleResult::SEQUENCE_LENGTH_TOO_LONG:
return PacketProcessResult::COMMAND_BUFFER_OVERFLOW;
case CommandReassembleResult::COMMAND_TOO_LARGE:
return PacketProcessResult::COMMAND_TOO_LARGE;
case CommandReassembleResult::COMMAND_DECOMPRESS_FAILED:
return PacketProcessResult::COMMAND_DECOMPRESS_FAILED;
default:
assert(false);
break;
}
} while(assemble_result == CommandReassembleResult::MORE_COMMANDS_PENDING);
} else {
this->callback_decoded_packet(this->callback_argument, packet_parser);
}
return PacketProcessResult::SUCCESS;
}
PacketProcessResult PacketDecoder::decode_incoming_packet(std::string& error, ClientPacketParser &packet_parser) {
assert(packet_parser.type() >= 0 && packet_parser.type() < this->incoming_generation_estimators.size());
auto& generation_estimator = this->incoming_generation_estimators[packet_parser.type()];
{
std::lock_guard glock{this->incoming_generation_estimator_lock};
packet_parser.set_estimated_generation(generation_estimator.visit_packet(packet_parser.packet_id()));
}
/* decrypt the packet if needed */
if(packet_parser.is_encrypted()) {
CryptHandler::key_t crypt_key{};
CryptHandler::nonce_t crypt_nonce{};
auto data = (uint8_t*) packet_parser.mutable_data_ptr();
bool use_default_key{!this->crypt_handler_->encryption_initialized()}, decrypt_result;
decrypt_packet:
if(use_default_key) {
crypt_key = CryptHandler::kDefaultKey;
crypt_nonce = CryptHandler::kDefaultNonce;
} else {
if(!this->crypt_handler_->generate_key_nonce(true, packet_parser.type(), packet_parser.packet_id(), packet_parser.estimated_generation(), crypt_key, crypt_nonce))
return PacketProcessResult::DECRYPT_KEY_GEN_FAILED;
}
decrypt_result = this->crypt_handler_->decrypt(
data + ClientPacketParser::kHeaderOffset, ClientPacketParser::kHeaderLength,
data + ClientPacketParser::kPayloadOffset, packet_parser.payload_length(),
data,
crypt_key, crypt_nonce,
error
);
if(!decrypt_result) {
if(packet_parser.packet_id() < 10 && packet_parser.estimated_generation() == 0) {
if(use_default_key) {
return PacketProcessResult::DECRYPT_FAILED;
} else {
use_default_key = true;
goto decrypt_packet;
}
} else {
return PacketProcessResult::DECRYPT_FAILED;
}
}
packet_parser.set_decrypted();
}
return PacketProcessResult::SUCCESS;
}
bool PacketDecoder::verify_encryption(const pipes::buffer_view &buffer) {
ClientPacketParser packet_parser{buffer};
if(!packet_parser.valid() || !packet_parser.is_encrypted()) return false;
assert(packet_parser.type() >= 0 && packet_parser.type() < this->incoming_generation_estimators.size());
return this->crypt_handler_->verify_encryption(buffer, packet_parser.packet_id(), this->incoming_generation_estimators[packet_parser.type()].generation());
}
void PacketDecoder::register_initiv_packet() {
auto& fragment_buffer = this->_command_fragment_buffers[command_fragment_buffer_index(protocol::COMMAND)];
std::unique_lock buffer_lock(fragment_buffer.buffer_lock);
fragment_buffer.set_full_index_to(1); /* the first packet (0) is already the clientinitiv packet */
}
CommandReassembleResult PacketDecoder::try_reassemble_ordered_packet(
command_fragment_buffer_t &buffer,
std::unique_lock<std::mutex> &buffer_lock,
ReassembledCommand *&assembled_command) {
assert(buffer_lock.owns_lock());
if(!buffer.front_set())
return CommandReassembleResult::NO_COMMANDS_PENDING;
uint8_t packet_flags{0};
std::unique_ptr<ReassembledCommand, void(*)(ReassembledCommand*)> rcommand{nullptr, ReassembledCommand::free};
/* lets find out if we've to reassemble the packet */
auto& first_buffer = buffer.slot_value(0);
if(first_buffer.packet_flags & PacketFlag::Fragmented) {
uint16_t sequence_length{1};
size_t total_payload_length{first_buffer.payload_length};
do {
if(sequence_length >= buffer.capacity())
return CommandReassembleResult::SEQUENCE_LENGTH_TOO_LONG;
if(!buffer.slot_set(sequence_length))
return CommandReassembleResult::NO_COMMANDS_PENDING; /* we need more packets */
auto& packet = buffer.slot_value(sequence_length++);
total_payload_length += packet.payload_length;
if(packet.packet_flags & PacketFlag::Fragmented) {
/* yep we find the end */
break;
}
} while(true);
/* ok we have all fragments lets reassemble */
/*
* Packet sequence could never be so long. If it is so then the data_length() returned an invalid value.
* We're checking it here because we dont want to make a huge allocation
*/
assert(total_payload_length < 512 * 1024 * 1024);
rcommand.reset(ReassembledCommand::allocate(total_payload_length));
char* packet_buffer_ptr = rcommand->command();
size_t packet_count{0};
packet_flags = buffer.slot_value(0).packet_flags;
while(packet_count < sequence_length) {
auto fragment = buffer.pop_front();
memcpy(packet_buffer_ptr, fragment.payload.data_ptr(), fragment.payload_length);
packet_buffer_ptr += fragment.payload_length;
packet_count++;
}
#ifndef _NDEBUG
if((packet_buffer_ptr - 1) != &rcommand->command()[rcommand->length() - 1]) {
logCritical(0,
"Buffer over/underflow: packet_buffer_ptr != &packet_buffer[packet_buffer.length() - 1]; packet_buffer_ptr := {}; packet_buffer.end() := {}",
(void*) packet_buffer_ptr,
(void*) &rcommand->command()[rcommand->length() - 1]
);
}
#endif
} else {
auto packet = buffer.pop_front();
packet_flags = packet.packet_flags;
rcommand.reset(ReassembledCommand::allocate(packet.payload_length));
memcpy(rcommand->command(), packet.payload.data_ptr(), packet.payload_length);
}
auto more_commands_pending = buffer.front_set(); /* set the more flag if we have more to process */
buffer_lock.unlock();
if(packet_flags & PacketFlag::Compressed) {
std::string error{};
auto compressed_command = std::move(rcommand);
auto decompressed_size = compression::qlz_decompressed_size(compressed_command->command(), compressed_command->length());
if(decompressed_size > 64 * 1024 * 1024)
return CommandReassembleResult::COMMAND_TOO_LARGE;
rcommand.reset(ReassembledCommand::allocate(decompressed_size));
if(!compression::qlz_decompress_payload(compressed_command->command(), rcommand->command(), &decompressed_size))
return CommandReassembleResult::COMMAND_DECOMPRESS_FAILED;
rcommand->set_length(decompressed_size);
}
assembled_command = rcommand.release();
return more_commands_pending ? CommandReassembleResult::MORE_COMMANDS_PENDING : CommandReassembleResult::SUCCESS;
}

View File

@ -0,0 +1,133 @@
#pragma once
#include <misc/spin_mutex.h>
#include <mutex>
#include <deque>
#include <protocol/Packet.h>
#include <protocol/generation.h>
#include <protocol/ringbuffer.h>
namespace ts::connection {
class CryptHandler;
class CompressionHandler;
class AcknowledgeManager;
}
namespace ts::stats {
class ConnectionStatistics;
}
namespace ts::server::server::udp {
struct CommandFragment {
uint16_t packet_id{0};
uint16_t packet_generation{0};
uint8_t packet_flags{0};
uint32_t payload_length : 24;
pipes::buffer payload{};
CommandFragment() { this->payload_length = 0; }
CommandFragment(uint16_t packetId, uint16_t packetGeneration, uint8_t packetFlags, uint32_t payloadLength, pipes::buffer payload)
: packet_id{packetId}, packet_generation{packetGeneration}, packet_flags{packetFlags}, payload_length{payloadLength}, payload{std::move(payload)} {}
CommandFragment& operator=(const CommandFragment&) = default;
CommandFragment(const CommandFragment& other) = default;
CommandFragment(CommandFragment&&) = default;
};
static_assert(sizeof(CommandFragment) == 8 + sizeof(pipes::buffer));
struct ReassembledCommand {
public:
static ReassembledCommand* allocate(size_t /* command length */);
static void free(ReassembledCommand* /* command */);
[[nodiscard]] inline size_t length() const { return this->length_; }
inline void set_length(size_t length) { assert(this->capacity_ >= length); this->length_ = length; }
[[nodiscard]] inline size_t capacity() const { return this->capacity_; }
[[nodiscard]] inline const char* command() const { return (const char*) this + sizeof(ReassembledCommand); }
[[nodiscard]] inline char* command() { return (char*) this + sizeof(ReassembledCommand); }
[[nodiscard]] inline std::string_view command_view() const { return std::string_view{this->command(), this->length()}; }
mutable ReassembledCommand* next_command; /* nullptr by default */
private:
explicit ReassembledCommand() = default;
size_t capacity_;
size_t length_;
};
enum struct PacketProcessResult {
SUCCESS,
UNKNOWN_ERROR,
FUZZ_DROPPED,
DUPLICATED_PACKET, /* error message contains debug properties */
BUFFER_OVERFLOW, /* error message contains debug properties */
BUFFER_UNDERFLOW, /* error message contains debug properties */
COMMAND_BUFFER_OVERFLOW, /* can cause a total connection drop */
COMMAND_SEQUENCE_LENGTH_TOO_LONG, /* unrecoverable error */
COMMAND_TOO_LARGE,
COMMAND_DECOMPRESS_FAILED,
DECRYPT_KEY_GEN_FAILED,
DECRYPT_FAILED, /* has custom message */
};
enum struct CommandReassembleResult {
SUCCESS,
MORE_COMMANDS_PENDING, /* equal with success */
NO_COMMANDS_PENDING,
COMMAND_TOO_LARGE, /* this is a fatal error to the connection */
COMMAND_DECOMPRESS_FAILED,
SEQUENCE_LENGTH_TOO_LONG /* unrecoverable error */
};
class PacketDecoder {
typedef protocol::FullPacketRingBuffer<CommandFragment, 32, CommandFragment> command_fragment_buffer_t;
typedef std::array<command_fragment_buffer_t, 2> command_packet_reassembler;
public:
/* direct function calls are better optimized out */
typedef void(*callback_decoded_packet_t)(void* /* cb argument */, const protocol::ClientPacketParser&);
typedef void(*callback_decoded_command_t)(void* /* cb argument */, ReassembledCommand*& /* command */); /* must move the command, else it gets freed*/
typedef void(*callback_send_acknowledge_t)(void* /* cb argument */, uint16_t /* packet id */, bool /* is command low */);
explicit PacketDecoder(connection::CryptHandler* /* crypt handler */);
~PacketDecoder();
void reset();
bool verify_encryption(const pipes::buffer_view& /* full packet */);
/* true if commands might be pending */
PacketProcessResult process_incoming_data(protocol::ClientPacketParser &/* packet */, std::string& /* error detail */);
void register_initiv_packet();
void* callback_argument{nullptr};
callback_decoded_packet_t callback_decoded_packet{[](auto, auto&){}}; /* needs to be valid all the time! */
callback_decoded_command_t callback_decoded_command{[](auto, auto&){}}; /* needs to be valid all the time! */
callback_send_acknowledge_t callback_send_acknowledge{[](auto, auto, auto){}}; /* needs to be valid all the time! */
private:
connection::CryptHandler* crypt_handler_{nullptr};
spin_mutex incoming_generation_estimator_lock{};
std::array<protocol::generation_estimator, 9> incoming_generation_estimators{}; /* implementation is thread save */
std::recursive_mutex packet_buffer_lock;
command_packet_reassembler _command_fragment_buffers;
static inline uint8_t command_fragment_buffer_index(uint8_t packet_index) {
return packet_index & 0x1U; /* use 0 for command and 1 for command low */
}
PacketProcessResult decode_incoming_packet(std::string &error /* error */, protocol::ClientPacketParser &packet_parser/* packet */);
CommandReassembleResult try_reassemble_ordered_packet(command_fragment_buffer_t& /* buffer */, std::unique_lock<std::mutex>& /* buffer lock */, ReassembledCommand*& /* command */);
};
}

View File

@ -0,0 +1,344 @@
//
// Created by WolverinDEV on 09/03/2020.
//
#include "PacketEncoder.h"
#include <protocol/buffers.h>
#include <protocol/AcknowledgeManager.h>
#include <protocol/CompressionHandler.h>
#include <protocol/CryptHandler.cpp>
#include "../../ConnectionStatistics.h"
using namespace ts;
using namespace ts::server::server::udp;
PacketEncoder::PacketEncoder(ts::connection::CryptHandler *crypt_handler, client::PacketStatistics* pstats)
: crypt_handler_{crypt_handler}, packet_statistics_{pstats} {
this->acknowledge_manager_.callback_data = this;
this->acknowledge_manager_.destroy_packet = [](void* packet) {
reinterpret_cast<OutgoingServerPacket*>(packet)->unref();
};
this->acknowledge_manager_.callback_resend_failed = [](void* this_ptr, const auto& entry) {
auto encoder = reinterpret_cast<PacketEncoder*>(this_ptr);
encoder->callback_resend_failed(encoder->callback_data, entry);
};
}
PacketEncoder::~PacketEncoder() {
this->reset();
}
void PacketEncoder::reset() {
this->acknowledge_manager_.reset();
protocol::OutgoingServerPacket *whead, *rhead;
{
std::lock_guard wlock{this->write_queue_mutex};
whead = std::exchange(this->encrypt_queue_head, nullptr);
rhead = std::exchange(this->resend_queue_head, nullptr);
this->encrypt_queue_tail = &this->encrypt_queue_head;
this->resend_queue_tail = &this->resend_queue_head;
}
while(whead) {
auto next = whead->next;
whead->unref();
whead = next;
}
while(rhead) {
auto next = rhead->next;
rhead->unref();
rhead = next;
}
}
void PacketEncoder::send_packet(ts::protocol::OutgoingServerPacket *packet) {
uint32_t full_id;
{
std::lock_guard id_lock{this->packet_id_mutex};
full_id = this->packet_id_manager.generate_full_id(packet->packet_type());
}
packet->set_packet_id(full_id & 0xFFFFU);
packet->generation = full_id >> 16U;
{
std::lock_guard qlock{this->write_queue_mutex};
*this->encrypt_queue_tail = packet;
this->encrypt_queue_tail = &packet->next;
}
this->callback_request_write(this->callback_data);
auto category = stats::ConnectionStatistics::category::from_type(packet->packet_type());
this->callback_connection_stats(this->callback_data, category, packet->packet_length() + 96); /* 96 for the UDP packet overhead */
}
#define MAX_COMMAND_PACKET_PAYLOAD_LENGTH (487)
void PacketEncoder::send_command(const std::string_view &command, bool low, std::unique_ptr<threads::Future<bool>> ack_listener) {
bool own_data_buffer{false};
void* own_data_buffer_ptr; /* imutable! */
const char* data_buffer{command.data()};
size_t data_length{command.length()};
uint8_t head_pflags{0};
PacketType ptype{low ? PacketType::COMMAND_LOW : PacketType::COMMAND};
protocol::OutgoingServerPacket *packets_head{nullptr};
protocol::OutgoingServerPacket **packets_tail{&packets_head};
/* only compress "long" commands */
if(command.size() > 100) {
size_t max_compressed_payload_size = compression::qlz_compressed_size(command.data(), command.length());
auto compressed_buffer = ::malloc(max_compressed_payload_size);
size_t compressed_size{max_compressed_payload_size};
if(!compression::qlz_compress_payload(command.data(), command.length(), compressed_buffer, &compressed_size)) {
logCritical(0, "Failed to compress command packet. Dropping packet");
::free(compressed_buffer);
return;
}
/* we don't need to make the command longer than it is */
if(compressed_size < command.length()) {
own_data_buffer = true;
data_buffer = (char*) compressed_buffer;
own_data_buffer_ptr = compressed_buffer;
data_length = compressed_size;
head_pflags |= PacketFlag::Compressed;
} else {
::free(compressed_buffer);
}
}
uint8_t ptype_and_flags{(uint8_t) ((uint8_t) ptype | (uint8_t) PacketFlag::NewProtocol)};
if(data_length > MAX_COMMAND_PACKET_PAYLOAD_LENGTH) {
auto chunk_count = (size_t) ceil((float) data_length / (float) MAX_COMMAND_PACKET_PAYLOAD_LENGTH);
auto chunk_size = (size_t) ceil((float) data_length / (float) chunk_count);
while(true) {
auto bytes = min(chunk_size, data_length);
auto packet = protocol::allocate_outgoing_packet(bytes);
packet->type_and_flags = ptype_and_flags;
memcpy(packet->payload, data_buffer, bytes);
*packets_tail = packet;
packets_tail = &packet->next;
data_length -= bytes;
if(data_length == 0) {
packet->type_and_flags |= PacketFlag::Fragmented;
break;
}
data_buffer += bytes;
}
packets_head->type_and_flags |= PacketFlag::Fragmented;
} else {
auto packet = protocol::allocate_outgoing_packet(data_length);
packet->type_and_flags = ptype_and_flags;
memcpy(packet->payload, data_buffer, data_length);
packets_head = packet;
packets_tail = &packet->next;
}
{
std::lock_guard id_lock{this->packet_id_mutex};
uint32_t full_id;
auto head = packets_head;
while(head) {
full_id = this->packet_id_manager.generate_full_id(ptype);
head->set_packet_id(full_id & 0xFFFFU);
head->generation = full_id >> 16U;
head = head->next;
}
}
packets_head->type_and_flags |= head_pflags;
/* general stats */
auto head = packets_head;
while(head) {
this->callback_connection_stats(this->callback_data, StatisticsCategory::COMMAND, head->packet_length() + 96); /* 96 for the UDP overhead */
head = head->next;
}
/* loss stats */
{
auto head = packets_head;
while(head) {
auto full_packet_id = (uint32_t) (head->generation << 16U) | head->packet_id();
this->packet_statistics_->send_command(head->packet_type(), full_packet_id);
/* increase a reference for the ack handler */
head->ref();
/* Even thou the packet is yet unencrypted, it will be encrypted with the next write. The next write will be before the next resend because the next ptr must be null in order to resend a packet */
if(&head->next == packets_tail)
this->acknowledge_manager_.process_packet(ptype, full_packet_id, head, std::move(ack_listener));
else
this->acknowledge_manager_.process_packet(ptype, full_packet_id, head, nullptr);
head = head->next;
}
}
{
std::lock_guard qlock{this->write_queue_mutex};
*this->encrypt_queue_tail = packets_head;
this->encrypt_queue_tail = packets_tail;
}
this->callback_request_write(this->callback_data);
if(own_data_buffer)
::free(own_data_buffer_ptr);
}
void PacketEncoder::encrypt_pending_packets() {
OutgoingServerPacket* packets_head;
{
std::lock_guard wlock{this->write_queue_mutex};
packets_head = this->encrypt_queue_head;
this->encrypt_queue_head = nullptr;
this->encrypt_queue_tail = &this->encrypt_queue_head;
}
if(!packets_head)
return;
auto packet = packets_head;
while(packet) {
this->prepare_outgoing_packet(packet);
packet = packet->next;
}
}
bool PacketEncoder::prepare_outgoing_packet(ts::protocol::OutgoingServerPacket *packet) {
if(packet->type_and_flags & PacketFlag::Unencrypted) {
this->crypt_handler_->write_default_mac(packet->mac);
} else {
CryptHandler::key_t crypt_key{};
CryptHandler::nonce_t crypt_nonce{};
std::string error{};
if(!this->crypt_handler_->encryption_initialized()) {
crypt_key = CryptHandler::kDefaultKey;
crypt_nonce = CryptHandler::kDefaultNonce;
} else {
if(!this->crypt_handler_->generate_key_nonce(false, (uint8_t) packet->packet_type(), packet->packet_id(), packet->generation, crypt_key, crypt_nonce)) {
this->callback_crypt_error(this->callback_data, CryptError::KEY_GENERATION_FAILED, "");
return false;
}
}
auto crypt_result = this->crypt_handler_->encrypt((char*) packet->packet_data() + ServerPacketP::kHeaderOffset, ServerPacketP::kHeaderLength,
packet->payload, packet->payload_size,
packet->mac,
crypt_key, crypt_nonce, error);
if(!crypt_result) {
this->callback_crypt_error(this->callback_data, CryptError::KEY_GENERATION_FAILED, error);
return false;
}
}
return true;
}
PacketEncoder::BufferPopResult PacketEncoder::pop_write_buffer(protocol::OutgoingServerPacket *&result) {
bool need_prepare_packet{false}, more_packets{false};
{
std::lock_guard wlock{this->write_queue_mutex};
if(this->resend_queue_head) {
result = this->resend_queue_head;
if(result->next) {
assert(this->resend_queue_tail != &result->next);
this->resend_queue_head = result->next;
} else {
assert(this->resend_queue_tail == &result->next);
this->resend_queue_head = nullptr;
this->resend_queue_tail = &this->resend_queue_head;
}
} else if(this->encrypt_queue_head) {
result = this->encrypt_queue_head;
if(result->next) {
assert(this->encrypt_queue_tail != &result->next);
this->encrypt_queue_head = result->next;
} else {
assert(this->encrypt_queue_tail == &result->next);
this->encrypt_queue_head = nullptr;
this->encrypt_queue_tail = &this->encrypt_queue_head;
}
need_prepare_packet = true;
} else {
return BufferPopResult::DRAINED;
}
result->next = nullptr;
more_packets = this->resend_queue_head != nullptr || this->encrypt_queue_head != nullptr;
}
if(need_prepare_packet)
this->prepare_outgoing_packet(result);
return more_packets ? BufferPopResult::MORE_AVAILABLE : BufferPopResult::DRAINED;
}
void PacketEncoder::execute_resend(const std::chrono::system_clock::time_point &now, std::chrono::system_clock::time_point &next) {
std::deque<std::shared_ptr<connection::AcknowledgeManager::Entry>> buffers{};
std::string error{};
this->acknowledge_manager_.execute_resend(now, next, buffers);
if(!buffers.empty()) {
size_t send_count{0};
{
lock_guard wlock{this->write_queue_mutex};
for(auto& buffer : buffers) {
auto packet = (protocol::OutgoingServerPacket*) buffer->packet_ptr;
/* Test if the packet is still in the write/enqueue queue */
if(packet->next)
continue;
if(&packet->next == this->encrypt_queue_tail || &packet->next == this->resend_queue_tail)
continue;
packet->ref(); /* for the write queue again */
*this->resend_queue_tail = packet;
this->resend_queue_tail = &packet->next;
send_count++;
buffer->resend_count++;
this->packet_statistics_->send_command((protocol::PacketType) buffer->packet_type, buffer->packet_full_id);
}
}
this->callback_request_write(this->callback_data);
this->callback_resend_stats(this->callback_data, buffers.size());
}
}
bool PacketEncoder::wait_empty_write_and_prepare_queue(chrono::time_point<chrono::system_clock> until) {
while(true) {
{
std::lock_guard wlock{this->write_queue_mutex};
if(this->encrypt_queue_head)
goto _wait;
if(this->resend_queue_head)
goto _wait;
}
break;
_wait:
if(until.time_since_epoch().count() != 0 && std::chrono::system_clock::now() > until)
return false;
std::this_thread::sleep_for(std::chrono::milliseconds(5));
}
return true;
}

View File

@ -0,0 +1,87 @@
#pragma once
#include <misc/spin_mutex.h>
#include <mutex>
#include <deque>
#include <protocol/Packet.h>
#include <protocol/AcknowledgeManager.h>
#include <src/ConnectionStatistics.h>
#include "./PacketStatistics.h"
namespace ts::connection {
class CryptHandler;
class AcknowledgeManager;
}
namespace ts::server::server::udp {
class PacketEncoder {
using AcknowledgeEntry = connection::AcknowledgeManager::Entry;
using StatisticsCategory = stats::ConnectionStatistics::category;
public:
enum struct BufferPopResult {
DRAINED,
MORE_AVAILABLE
};
enum struct CryptError {
KEY_GENERATION_FAILED,
ENCRYPT_FAILED /* contains some data */
};
typedef void(*callback_request_write_t)(void* /* user data */);
typedef void(*callback_crypt_error_t)(void* /* user data */, const CryptError& /* error */, const std::string& /* details */);
typedef void(*callback_resend_stats_t)(void* /* user data */, size_t /* resend packets */);
typedef void(*callback_resend_failed_t)(void* /* user data */, const std::shared_ptr<AcknowledgeEntry>& /* entry */);
typedef void(*callback_connection_stats_t)(void* /* user data */, StatisticsCategory::value, size_t /* bytes */);
explicit PacketEncoder(connection::CryptHandler* /* crypt handler */, client::PacketStatistics* /* packet stats */);
~PacketEncoder();
void reset();
void send_packet(protocol::OutgoingServerPacket* /* packet */); /* will claim ownership */
void send_command(const std::string_view& /* build command command */, bool /* command low */, std::unique_ptr<threads::Future<bool>> /* acknowledge listener */);
void execute_resend(const std::chrono::system_clock::time_point &now, std::chrono::system_clock::time_point &next);
void encrypt_pending_packets();
bool wait_empty_write_and_prepare_queue(std::chrono::time_point<std::chrono::system_clock> until = std::chrono::time_point<std::chrono::system_clock>());
/* if the result is true, ownership has been transferred */
BufferPopResult pop_write_buffer(protocol::OutgoingServerPacket*& /* packet */);
[[nodiscard]] inline auto& acknowledge_manager() { return this->acknowledge_manager_; }
/* callbacks must be valid all the time! */
void* callback_data{nullptr};
callback_request_write_t callback_request_write{[](auto){}};
callback_crypt_error_t callback_crypt_error{[](auto, auto, auto){}};
callback_resend_stats_t callback_resend_stats{[](auto, auto){}};
callback_resend_failed_t callback_resend_failed{[](auto, auto){}};
callback_connection_stats_t callback_connection_stats{[](auto, auto, auto){}};
private:
connection::CryptHandler* crypt_handler_{nullptr};
client::PacketStatistics* packet_statistics_{nullptr};
connection::AcknowledgeManager acknowledge_manager_{};
spin_mutex write_queue_mutex{};
protocol::OutgoingServerPacket* resend_queue_head{nullptr};
protocol::OutgoingServerPacket** resend_queue_tail{&resend_queue_head};
protocol::OutgoingServerPacket* encrypt_queue_head{nullptr};
protocol::OutgoingServerPacket** encrypt_queue_tail{&encrypt_queue_head};
protocol::PacketIdManager packet_id_manager;
spin_mutex packet_id_mutex{};
/* thread save function */
bool prepare_outgoing_packet(protocol::OutgoingServerPacket* /* packet */);
};
}

View File

@ -56,6 +56,7 @@ void VoiceClient::sendCommand0(const std::string_view& cmd, bool low, std::uniqu
logTrace(this->getServerId(), "{}[Command][Server -> Client] Sending command {}. Command low: {}. Full command: {}", CLIENT_STR_LOG_PREFIX, cmd.substr(0, cmd.find(' ')), low, cmd);
#endif
}
void VoiceClient::sendAcknowledge(uint16_t packetId, bool low) {
char buffer[2];
le2be16(packetId, buffer);
@ -222,7 +223,7 @@ bool VoiceClient::close_connection(const system_clock::time_point &timeout) {
while(this->state == DISCONNECTING_FLUSHING) {
if(system_clock::now() > timeout){
auto write_queue_flushed = this->connection->wait_empty_write_and_prepare_queue(timeout);
auto acknowledge_received = connection->acknowledge_handler.awaiting_acknowledge() == 0;
auto acknowledge_received = connection->packet_encoder().acknowledge_manager().awaiting_acknowledge() == 0;
if(write_queue_flushed && acknowledge_received)
break;
@ -233,7 +234,7 @@ bool VoiceClient::close_connection(const system_clock::time_point &timeout) {
if(!this->connection->wait_empty_write_and_prepare_queue(timeout))
continue;
if(connection->acknowledge_handler.awaiting_acknowledge() > 0) {
if(connection->packet_encoder().acknowledge_manager().awaiting_acknowledge() > 0) {
usleep(5000);
continue;
}
@ -296,7 +297,7 @@ void VoiceClient::send_voice_whisper_packet(const pipes::buffer_view &voice_buff
}
float VoiceClient::current_ping_deviation() {
return this->connection->getAcknowledgeManager().current_rttvar();
return this->connection->packet_encoder().acknowledge_manager().current_rttvar();
}
float VoiceClient::current_packet_loss() const {

View File

@ -13,14 +13,13 @@
#include "../ConnectedClient.h"
#include "protocol/CryptHandler.h"
#include "VoiceClientConnection.h"
#include "PrecomputedPuzzles.h"
#include "src/server/PrecomputedPuzzles.h"
#include "../../lincense/TeamSpeakLicense.h"
//#define LOG_INCOMPING_PACKET_FRAGMENTS
//#define LOG_AUTO_ACK_AUTORESPONSE
//#define LOG_AUTO_ACK_REQUEST
//#define LOG_AUTO_ACK_RESPONSE
//#define LOG_PKT_RESEND
#define PKT_LOG_CMD
//#define PKT_LOG_VOICE
@ -48,14 +47,14 @@ namespace ts {
friend class io::IOServerHandler;
public:
VoiceClient(const std::shared_ptr<VoiceServer>& server,const sockaddr_storage*);
~VoiceClient();
~VoiceClient() override;
bool close_connection(const std::chrono::system_clock::time_point &timeout) override;
bool disconnect(const std::string&) override;
bool disconnect(ViewReasonId /* reason type */, const std::string& /* reason */, const std::shared_ptr<ts::server::ConnectedClient>& /* invoker */, bool /* notify viewer */);
virtual void sendCommand(const ts::Command &command, bool low = false) { return this->sendCommand0(command.build(), low); }
virtual void sendCommand(const ts::command_builder &command, bool low) { return this->sendCommand0(command.build(), low); }
void sendCommand(const ts::Command &command, bool low = false) override { return this->sendCommand0(command.build(), low); }
void sendCommand(const ts::command_builder &command, bool low) override { return this->sendCommand0(command.build(), low); }
/* Note: Order is only guaranteed if progressDirectly is on! */
virtual void sendCommand0(const std::string_view& /* data */, bool low = false, std::unique_ptr<threads::Future<bool>> listener = nullptr);
@ -77,16 +76,13 @@ namespace ts {
void initialize();
virtual void tick(const std::chrono::system_clock::time_point &time) override;
/* Attention these handle callbacks are not thread save! */
void handlePacketCommand(const pipes::buffer_view&);
void handlePacketAck(const protocol::ClientPacketParser&);
void handlePacketVoice(const protocol::ClientPacketParser&);
void handlePacketVoiceWhisper(const protocol::ClientPacketParser&);
void handlePacketPing(const protocol::ClientPacketParser&);
void handlePacketInit(const protocol::ClientPacketParser&);
//Handshake helpers
public:
void send_voice_packet(const pipes::buffer_view &packet, const VoicePacketFlags &flags) override;
void send_voice_whisper_packet(const pipes::buffer_view &packet, const VoicePacketFlags &flags) override;

View File

@ -57,7 +57,7 @@ inline bool calculate_security_level(int& result, ecc_key* pubKey, size_t offset
command_result VoiceClient::handleCommandClientInit(Command &cmd) {
this->crypto.client_init = true;
this->connection->acknowledge_handler.reset();
this->connection->packet_encoder().acknowledge_manager().reset();
if(this->getType() == ClientType::CLIENT_TEAMSPEAK) {
int securityLevel;

View File

@ -1,4 +1,3 @@
#include <misc/endianness.h>
#include <algorithm>
#include <log/LogUtils.h>
#include "../../server/VoiceServer.h"
@ -6,7 +5,6 @@
#include <protocol/Packet.h>
#include <ThreadPool/Timer.h>
#include "VoiceClientConnection.h"
#include "src/client/ConnectedClient.h"
#include "VoiceClient.h"
@ -29,14 +27,22 @@ using namespace ts::connection;
using namespace ts::protocol;
using namespace ts::server;
VoiceClientConnection::VoiceClientConnection(VoiceClient* client) : client(client) {
VoiceClientConnection::VoiceClientConnection(VoiceClient* client)
: client{client}, crypt_handler{}, packet_decoder_{&this->crypt_handler}, packet_encoder_{&this->crypt_handler, &this->packet_statistics_} {
memtrack::allocated<VoiceClientConnection>(this);
this->acknowledge_handler.destroy_packet = [](void* packet) {
reinterpret_cast<OutgoingServerPacket*>(packet)->unref();
};
this->packet_decoder_.callback_argument = this;
this->packet_decoder_.callback_decoded_packet = VoiceClientConnection::callback_packet_decoded;
this->packet_decoder_.callback_decoded_command = VoiceClientConnection::callback_command_decoded;
this->packet_decoder_.callback_send_acknowledge = VoiceClientConnection::callback_send_acknowledge;
this->packet_encoder_.callback_data = this;
this->packet_encoder_.callback_request_write = VoiceClientConnection::callback_request_write;
this->packet_encoder_.callback_crypt_error = VoiceClientConnection::callback_encode_crypt_error;
this->packet_encoder_.callback_resend_failed = VoiceClientConnection::callback_resend_failed;
this->packet_encoder_.callback_resend_stats = VoiceClientConnection::callback_resend_statistics;
this->packet_encoder_.callback_connection_stats = VoiceClientConnection::callback_outgoing_connection_statistics;
this->crypt_handler.reset();
debugMessage(client->getServer()->getServerId(), "Allocated new voice client connection at {}", (void*) this);
}
@ -52,33 +58,15 @@ void VoiceClientConnection::triggerWrite() {
}
#ifdef CLIENT_LOG_PREFIX
#undef CLIENT_LOG_PREFIX
#undef CLIENT_LOG_PREFIX
#endif
#define CLIENT_LOG_PREFIX "[" << this->client->getPeerIp() << ":" << this->client->getPeerPort() << " | " << this->client->getDisplayName() << "]"
//Message handle methods
void VoiceClientConnection::handle_incoming_datagram(const pipes::buffer_view& buffer) {
#ifdef FUZZING_TESTING_INCOMMING
#ifdef FIZZING_TESTING_DISABLE_HANDSHAKE
if (this->client->state == ConnectionState::CONNECTED) {
#endif
if ((rand() % FUZZING_TESTING_DROP_MAX) < FUZZING_TESTING_DROP) {
debugMessage(this->client->getServerId(), "{}[FUZZING] Dropping incoming packet of length {}", CLIENT_STR_LOG_PREFIX_(this->client), buffer.length());
return;
}
#ifdef FIZZING_TESTING_DISABLE_HANDSHAKE
}
#endif
#endif
ClientPacketParser packet_parser{buffer};
if(!packet_parser.valid()) {
logTrace(this->client->getServerId(), "{} Received invalid packet. Dropping.", CLIENT_STR_LOG_PREFIX_(this->client));
if(!packet_parser.valid())
return;
}
assert(packet_parser.type() >= 0 && packet_parser.type() < this->incoming_generation_estimators.size());
packet_parser.set_estimated_generation(this->incoming_generation_estimators[packet_parser.type()].visit_packet(packet_parser.packet_id()));
#ifndef CONNECTION_NO_STATISTICS
if(this->client) {
@ -88,170 +76,150 @@ void VoiceClientConnection::handle_incoming_datagram(const pipes::buffer_view& b
this->packet_statistics().received_packet((protocol::PacketType) packet_parser.type(), packet_parser.full_packet_id());
#endif
auto is_command = packet_parser.type() == protocol::COMMAND || packet_parser.type() == protocol::COMMAND_LOW;
std::string error{};
auto result = this->packet_decoder_.process_incoming_data(packet_parser, error);
using PacketProcessResult = server::server::udp::PacketProcessResult;
switch (result) {
case PacketProcessResult::SUCCESS:
case PacketProcessResult::FUZZ_DROPPED: /* maybe some kind of log? */
case PacketProcessResult::DECRYPT_FAILED: /* Silently drop this packet */
case PacketProcessResult::DUPLICATED_PACKET: /* no action needed, acknowledge should be send already */
break;
/* in previous versions we checked if the arrived packet is "worth decoding".
* But since in general a command buffer underflow is much more unlikely, especially because most packets are not even command packets,
* it's better we just skip that step and decode it anyways */
case PacketProcessResult::DECRYPT_KEY_GEN_FAILED:
/* no action needed, acknowledge should be send */
logCritical(this->client->getServerId(), "{} Failed to generate decrypt key. Dropping packet.", CLIENT_STR_LOG_PREFIX_(this->client));
break;
#if 0
/* pretest if the packet is worth the effort of decoding it */
if(is_command) {
/* handle the order stuff */
auto& fragment_buffer = this->_command_fragment_buffers[command_fragment_buffer_index(packet_parser.type())];
unique_lock queue_lock(fragment_buffer.buffer_lock);
auto result = fragment_buffer.accept_index(packet_parser.packet_id());
if(result != 0) { /* packet index is ahead buffer index */
debugMessage(this->client->getServerId(), "{} Dropping command packet because command assembly buffer has an {} ({}|{}|{})",
case PacketProcessResult::BUFFER_OVERFLOW:
case PacketProcessResult::BUFFER_UNDERFLOW:
debugMessage(this->client->getServerId(), "{} Dropping command packet because command assembly buffer has an {}: {}",
CLIENT_STR_LOG_PREFIX_(this->client),
result == -1 ? "underflow" : "overflow",
fragment_buffer.capacity(),
fragment_buffer.current_index(),
packet_parser.packet_id()
result == PacketProcessResult::BUFFER_UNDERFLOW ? "underflow" : "overflow",
error
);
break;
if(result == -1) { /* underflow */
/* we've already got the packet, but the client dosn't know that so we've to send the acknowledge again */
if(this->client->crypto.protocol_encrypted)
this->client->sendAcknowledge(packet_parser.packet_id(), packet_parser.type() == protocol::COMMAND_LOW);
}
return;
}
}
#endif
case PacketProcessResult::UNKNOWN_ERROR:
logCritical(this->client->getServerId(), "{} Having an unknown error while processing a incoming packet: {}",
CLIENT_STR_LOG_PREFIX_(this->client),
error
);
goto disconnect_client;
//NOTICE I found out that the Compressed flag is set if the packet contains an audio header
case PacketProcessResult::COMMAND_BUFFER_OVERFLOW:
debugMessage(this->client->getServerId(), "{} Having a command buffer overflow. This might cause the client to drop.", CLIENT_STR_LOG_PREFIX_(this->client));
break;
if(this->client->state == ConnectionState::INIT_LOW && packet_parser.type() != protocol::INIT1)
return;
case PacketProcessResult::COMMAND_DECOMPRESS_FAILED:
logWarning(this->client->getServerId(), "{} Failed to decompress a command packet. Dropping command.", CLIENT_STR_LOG_PREFIX_(this->client));
break;
/* decrypt the packet if needed */
if(packet_parser.is_encrypted()) {
std::string error;
case PacketProcessResult::COMMAND_TOO_LARGE:
logWarning(this->client->getServerId(), "{} Received a too large command. Dropping client.", CLIENT_STR_LOG_PREFIX_(this->client));
goto disconnect_client;
CryptHandler::key_t crypt_key{};
CryptHandler::nonce_t crypt_nonce{};
case PacketProcessResult::COMMAND_SEQUENCE_LENGTH_TOO_LONG:
logWarning(this->client->getServerId(), "{} Received a too long command sequence. Dropping client.", CLIENT_STR_LOG_PREFIX_(this->client));
goto disconnect_client;
auto data = (uint8_t*) packet_parser.mutable_data_ptr();
bool use_default_key{!this->client->crypto.protocol_encrypted}, decrypt_result;
decrypt_packet:
if(use_default_key) {
crypt_key = CryptHandler::default_key;
crypt_nonce = CryptHandler::default_nonce;
} else {
if(!this->crypt_handler.generate_key_nonce(true, packet_parser.type(), packet_parser.packet_id(), packet_parser.estimated_generation(), crypt_key, crypt_nonce)) {
logError(this->client->getServerId(), "{} Failed to generate crypt key/nonce. This should never happen! Dropping packet.", CLIENT_STR_LOG_PREFIX_(this->client));
return;
}
}
decrypt_result = this->crypt_handler.decrypt(
data + ClientPacketParser::kHeaderOffset, ClientPacketParser::kHeaderLength,
data + ClientPacketParser::kPayloadOffset, packet_parser.payload_length(),
data,
crypt_key, crypt_nonce,
error
);
if(!decrypt_result) {
if(!this->client->crypto.client_init) {
if(use_default_key) {
logTrace(this->client->getServerId(), "{} Failed to decrypt packet with default key ({}). Dropping packet.", CLIENT_STR_LOG_PREFIX_(this->client), error);
return;
} else {
logTrace(this->client->getServerId(), "{} Failed to decrypt packet ({}). Trying with default key.", CLIENT_STR_LOG_PREFIX_(this->client), error);
use_default_key = true;
goto decrypt_packet;
}
} else {
logTrace(this->client->getServerId(), "{} Failed to decrypt packet ({}). Dropping packet.", CLIENT_STR_LOG_PREFIX_(this->client), error);
return;
}
}
packet_parser.set_decrypted();
} else if(is_command && this->client->state != ConnectionState::INIT_HIGH) {
logTrace(this->client->getServerId(), "{} Voice client {}/{} tried to send a unencrypted command packet. Dropping packet.", CLIENT_STR_LOG_PREFIX_(this->client), client->getDisplayName(), this->client->getLoggingPeerIp());
return;
default:
assert(false);
break;
}
#ifdef LOG_INCOMPING_PACKET_FRAGMENTS
debugMessage(lstream << CLIENT_LOG_PREFIX << "Recived packet. PacketId: " << packet->packetId() << " PacketType: " << packet->type().name() << " Flags: " << packet->flags() << " - " << packet->data() << endl);
#endif
if(is_command) {
auto& fragment_buffer = this->_command_fragment_buffers[command_fragment_buffer_index(packet_parser.type())];
CommandFragment fragment_entry{
packet_parser.packet_id(),
packet_parser.estimated_generation(),
return;
packet_parser.flags(),
(uint32_t) packet_parser.payload_length(),
packet_parser.payload().own_buffer()
};
disconnect_client:;
/* FIXME: Disconnect the client */
}
{
unique_lock queue_lock(fragment_buffer.buffer_lock);
void VoiceClientConnection::callback_send_acknowledge(void *ptr_this, uint16_t packet_id, bool command_low) {
/* FIXME: Move this to the connection! */
reinterpret_cast<VoiceClientConnection*>(ptr_this)->client->sendAcknowledge(packet_id, command_low);
}
if(!fragment_buffer.insert_index(packet_parser.packet_id(), std::move(fragment_entry))) {
auto ignore_type = fragment_buffer.accept_index(packet_parser.packet_id());
debugMessage(this->client->getServerId(), "{} Dropping command packet because command assembly buffer has an {} ({}|{}|{})",
CLIENT_STR_LOG_PREFIX_(this->client),
ignore_type == -1 ? "underflow" : "overflow",
fragment_buffer.capacity(),
fragment_buffer.current_index(),
packet_parser.packet_id()
);
void VoiceClientConnection::callback_packet_decoded(void *ptr_this, const ts::protocol::ClientPacketParser &packet) {
auto connection = reinterpret_cast<VoiceClientConnection*>(ptr_this);
switch (packet.type()) {
case protocol::VOICE:
connection->client->handlePacketVoice(packet);
break;
if(ignore_type == -1) { /* underflow */
/* we've already got the packet, but the client dosn't know that so we've to send the acknowledge again */
this->client->sendAcknowledge(packet_parser.packet_id(), packet_parser.type() == protocol::COMMAND_LOW);
}
return;
}
}
this->client->sendAcknowledge(packet_parser.packet_id(), packet_parser.type() == protocol::COMMAND_LOW);
case protocol::VOICE_WHISPER:
connection->client->handlePacketVoiceWhisper(packet);
break;
auto voice_server = this->client->voice_server;
if(voice_server)
voice_server->schedule_command_handling(this->client);
} else {
if(packet_parser.type() == protocol::VOICE)
this->client->handlePacketVoice(packet_parser);
else if(packet_parser.type() == protocol::VOICE_WHISPER)
this->client->handlePacketVoiceWhisper(packet_parser);
else if(packet_parser.type() == protocol::ACK || packet_parser.type() == protocol::ACK_LOW)
this->client->handlePacketAck(packet_parser);
else if(packet_parser.type() == protocol::PING || packet_parser.type() == protocol::PONG)
this->client->handlePacketPing(packet_parser);
else {
logError(this->client->getServerId(), "{} Received hand decoded packet, but we've no method to handle it. Dropping packet.", CLIENT_STR_LOG_PREFIX_(this->client));
}
case protocol::ACK:
case protocol::ACK_LOW:
connection->client->handlePacketAck(packet);
break;
case protocol::PING:
case protocol::PONG:
connection->client->handlePacketPing(packet);
break;
default:
assert(false);
logError(connection->client->getServerId(), "{} Received hand decoded packet, but we've no method to handle it. Dropping packet.", CLIENT_STR_LOG_PREFIX_(connection->client));
break;
}
}
bool VoiceClientConnection::verify_encryption(const pipes::buffer_view &buffer /* incl. mac etc */) {
ClientPacketParser packet_parser{buffer};
if(!packet_parser.valid() || !packet_parser.is_encrypted()) return false;
void VoiceClientConnection::callback_command_decoded(void *ptr_this, ts::server::server::udp::ReassembledCommand *&command) {
auto connection = reinterpret_cast<VoiceClientConnection*>(ptr_this);
assert(packet_parser.type() >= 0 && packet_parser.type() < this->incoming_generation_estimators.size());
return this->crypt_handler.verify_encryption(buffer, packet_parser.packet_id(), this->incoming_generation_estimators[packet_parser.type()].generation());
/* we're exchanging the command so we're taking the ownership */
connection->enqueue_command_execution(std::exchange(command, nullptr));
}
bool VoiceClientConnection::verify_encryption(const pipes::buffer_view &buffer /* incl. mac etc */) {
return this->packet_decoder_.verify_encryption(buffer);
}
void VoiceClientConnection::enqueue_command_execution(ReassembledCommand *command) {
assert(!command->next_command);
bool command_handling_scheduled{false};
{
std::lock_guard pc_lock{this->pending_commands_lock};
*this->pending_commands_tail = command;
this->pending_commands_tail = &command->next_command;
command_handling_scheduled = std::exchange(this->has_command_handling_scheduled, true);
}
if(!command_handling_scheduled) {
auto voice_server = this->client->voice_server;
if(voice_server)
voice_server->schedule_command_handling(this->client);
}
}
void VoiceClientConnection::execute_handle_command_packets(const std::chrono::system_clock::time_point& /* scheduled */) {
if(this->client->state >= ConnectionState::DISCONNECTING || !this->client->getServer())
return;
//TODO: Remove the buffer_execute_lock and use the one within the this->client->handlePacketCommand method
unique_lock<std::recursive_timed_mutex> buffer_execute_lock;
pipes::buffer payload{};
uint16_t packet_id{};
auto reexecute_handle = this->next_reassembled_command(buffer_execute_lock, payload, packet_id);
std::unique_ptr<ReassembledCommand, void(*)(ReassembledCommand*)> pending_command{nullptr, ReassembledCommand::free};
while(true) {
{
std::lock_guard pc_lock{this->pending_commands_lock};
pending_command.reset(this->pending_commands_head);
if(!pending_command) {
this->has_command_handling_scheduled = false;
return;
} else if(pending_command->next_command) {
this->pending_commands_head = pending_command->next_command;
} else {
this->pending_commands_head = nullptr;
this->pending_commands_tail = &this->pending_commands_head;
}
}
if(!payload.empty()){
auto startTime = system_clock::now();
try {
this->client->handlePacketCommand(payload);
this->client->handlePacketCommand(pipes::buffer_view{pending_command->command(), pending_command->length()});
} catch (std::exception& ex) {
logCritical(this->client->getServerId(), "{} Exception reached root tree! {}", CLIENT_STR_LOG_PREFIX_(this->client), ex.what());
}
@ -264,343 +232,8 @@ void VoiceClientConnection::execute_handle_command_packets(const std::chrono::sy
duration_cast<milliseconds>(end - startTime).count()
);
}
}
if(buffer_execute_lock.owns_lock())
buffer_execute_lock.unlock();
auto voice_server = this->client->voice_server;
if(voice_server && (reexecute_handle || this->should_reassembled_reschedule)) {
should_reassembled_reschedule = false;
this->client->voice_server->schedule_command_handling(this->client);
}
}
/* buffer_execute_lock: lock for in order execution */
bool VoiceClientConnection::next_reassembled_command(unique_lock<std::recursive_timed_mutex>& buffer_execute_lock, pipes::buffer& result, uint16_t& packet_id) {
command_fragment_buffer_t* buffer{nullptr};
unique_lock<std::recursive_timed_mutex> buffer_lock; /* general buffer lock */
bool have_more{false};
{
//FIXME: Currently command low packets cant be handeled if there is a command packet stuck in reassamble
/* handle commands before command low packets */
for(auto& buf : this->_command_fragment_buffers) {
unique_lock ring_lock(buf.buffer_lock, try_to_lock); //Perm lock the buffer else, may command wount get handeled. Because we've more left, but say we waven't
if(!ring_lock.owns_lock()) {
this->should_reassembled_reschedule = true;
continue;
}
if(buf.front_set()) {
if(!buffer) { /* lets still test for reexecute */
buffer_execute_lock = unique_lock(buf.execute_lock, try_to_lock);
if(!buffer_execute_lock.owns_lock()) {
this->should_reassembled_reschedule = true;
continue;
}
buffer_lock = move(ring_lock);
buffer = &buf;
} else {
have_more = true;
break;
}
}
}
}
if(!buffer)
return false; /* we've no packets */
uint8_t packet_flags{0};
pipes::buffer payload{};
/* lets find out if we've to reassemble the packet */
auto& first_buffer = buffer->slot_value(0);
packet_id = first_buffer.packet_id;
if(first_buffer.packet_flags & PacketFlag::Fragmented) {
uint16_t sequence_length{1};
size_t total_payload_length{first_buffer.payload_length};
do {
if(sequence_length >= buffer->capacity()) {
logError(this->client->getServerId(), "{} Command fragment buffer is full, and there is not fragmented packet end. Dropping full buffer which will probably cause a connection loss.", CLIENT_STR_LOG_PREFIX_(this->client));
buffer->clear();
return false; /* we've nothing to handle */
}
if(!buffer->slot_set(sequence_length))
return false; /* we need more packets */
auto& packet = buffer->slot_value(sequence_length++);
total_payload_length += packet.payload_length;
if(packet.packet_flags & PacketFlag::Fragmented) {
/* yep we find the end */
break;
}
} while(true);
/* ok we have all fragments lets reassemble */
/*
* Packet sequence could never be so long. If it is so then the data_length() returned an invalid value.
* We're checking it here because we dont want to make a huge allocation
*/
assert(total_payload_length < 512 * 1024 * 1024);
pipes::buffer packet_buffer{total_payload_length};
char* packet_buffer_ptr = &packet_buffer[0];
size_t packet_count{0};
packet_flags = buffer->slot_value(0).packet_flags;
while(packet_count < sequence_length) {
auto fragment = buffer->pop_front();
memcpy(packet_buffer_ptr, fragment.payload.data_ptr(), fragment.payload_length);
packet_buffer_ptr += fragment.payload_length;
packet_count++;
}
#ifndef _NDEBUG
if((packet_buffer_ptr - 1) != &packet_buffer[packet_buffer.length() - 1]) {
logCritical(this->client->getServer()->getServerId(),
"Buffer over/underflow: packet_buffer_ptr != &packet_buffer[packet_buffer.length() - 1]; packet_buffer_ptr := {}; packet_buffer.end() := {}",
(void*) packet_buffer_ptr,
(void*) &packet_buffer[packet_buffer.length() - 1]
);
}
#endif
payload = packet_buffer;
} else {
auto packet = buffer->pop_front();
packet_flags = packet.packet_flags;
payload = packet.payload;
}
have_more |= buffer->front_set(); /* set the more flag if we have more to process */
buffer_lock.unlock();
if(packet_flags & PacketFlag::Compressed) {
std::string error{};
auto decompressed_size = compression::qlz_decompressed_size(payload.data_ptr(), payload.length());
if(decompressed_size == 0) {
logTrace(this->client->getServerId(), "{} Failed to calculate decompressed size for received command. Dropping packet.", CLIENT_STR_LOG_PREFIX_(this->client));
return false;
} else if(decompressed_size > 20 * 1024 * 1024) { /* max 20MB */
logTrace(this->client->getServerId(), "{} Command packet has a too large compressed size. Dropping packet.", CLIENT_STR_LOG_PREFIX_(this->client));
return false;
}
auto decompress_buffer = buffer::allocate_buffer(decompressed_size);
if(!compression::qlz_decompress_payload(payload.data_ptr(), decompress_buffer.data_ptr(), &decompressed_size)) {
logTrace(this->client->getServerId(), "{} Failed to decompress received command. Dropping packet.", CLIENT_STR_LOG_PREFIX_(this->client));
return false;
}
payload = decompress_buffer.range(0, decompressed_size);
}
result = std::move(payload);
return have_more;
}
bool VoiceClientConnection::prepare_outgoing_packet(ts::protocol::OutgoingServerPacket *packet) {
if(packet->type_and_flags & PacketFlag::Unencrypted) {
this->crypt_handler.write_default_mac(packet->mac);
} else {
CryptHandler::key_t crypt_key{};
CryptHandler::nonce_t crypt_nonce{};
std::string error{};
if(!this->client->crypto.protocol_encrypted) {
crypt_key = CryptHandler::default_key;
crypt_nonce = CryptHandler::default_nonce;
} else {
if(!this->crypt_handler.generate_key_nonce(false, (uint8_t) packet->packet_type(), packet->packet_id(), packet->generation, crypt_key, crypt_nonce)) {
logError(this->client->getServerId(), "{} Failed to generate crypt key/nonce for sending a packet. This should never happen! Dropping packet.", CLIENT_STR_LOG_PREFIX_(this->client));
return false;
}
}
auto crypt_result = this->crypt_handler.encrypt((char*) packet->packet_data() + ServerPacketP::kHeaderOffset, ServerPacketP::kHeaderLength,
packet->payload, packet->payload_size,
packet->mac,
crypt_key, crypt_nonce, error);
if(!crypt_result){
logError(this->client->getServerId(), "{} Failed to encrypt packet. Error: {}", CLIENT_STR_LOG_PREFIX_(this->client), error);
return false;
}
}
return true;
}
VoiceClientConnection::WBufferPopResult VoiceClientConnection::pop_write_buffer(protocol::OutgoingServerPacket *&result) {
if(this->client->state == ConnectionState::DISCONNECTED)
return WBufferPopResult::DRAINED;
bool need_prepare_packet{false}, more_packets{false};
{
std::lock_guard wlock{this->write_queue_mutex};
if(this->resend_queue_head) {
result = this->resend_queue_head;
if(result->next) {
assert(this->resend_queue_tail != &result->next);
this->resend_queue_head = result->next;
} else {
assert(this->resend_queue_tail == &result->next);
this->resend_queue_head = nullptr;
this->resend_queue_tail = &this->resend_queue_head;
}
} else if(this->write_queue_head) {
result = this->write_queue_head;
if(result->next) {
assert(this->write_queue_tail != &result->next);
this->write_queue_head = result->next;
} else {
assert(this->write_queue_tail == &result->next);
this->write_queue_head = nullptr;
this->write_queue_tail = &this->write_queue_head;
}
need_prepare_packet = true;
} else {
return WBufferPopResult::DRAINED;
}
result->next = nullptr;
more_packets = this->resend_queue_head != nullptr || this->write_queue_head != nullptr;
}
if(need_prepare_packet)
this->prepare_outgoing_packet(result);
return more_packets ? WBufferPopResult::MORE_AVAILABLE : WBufferPopResult::DRAINED;
}
void VoiceClientConnection::execute_resend(const std::chrono::system_clock::time_point &now, std::chrono::system_clock::time_point &next) {
std::deque<std::shared_ptr<connection::AcknowledgeManager::Entry>> buffers{};
std::string error{};
if (this->acknowledge_handler.execute_resend(now, next, buffers, error) < 0) {
debugMessage(client->getServerId(), "{} Failed to execute packet resend: {}", CLIENT_STR_LOG_PREFIX_(this->client), error);
if(this->client->state == ConnectionState::CONNECTED) {
this->client->disconnect(ViewReasonId::VREASON_TIMEOUT, config::messages::timeout::packet_resend_failed, nullptr, true);
} else {
this->client->close_connection(system_clock::now() + seconds(1));
}
} else if(!buffers.empty()) {
size_t send_count{0};
{
lock_guard wlock{this->write_queue_mutex};
for(auto& buffer : buffers) {
auto packet = (protocol::OutgoingServerPacket*) buffer->packet_ptr;
if(packet->next) continue; /* still in write queue (this shall not happen very often) */
if(&packet->next == this->write_queue_tail || &packet->next == this->resend_queue_tail) continue;
packet->ref(); /* for the write queue again */
*this->resend_queue_tail = packet;
this->resend_queue_tail = &packet->next;
send_count++;
buffer->resend_count++;
this->packet_statistics().send_command((protocol::PacketType) buffer->packet_type, buffer->packet_full_id);
}
}
logTrace(client->getServerId(), "{} Resending {} packets. Send actually {} packets.", CLIENT_STR_LOG_PREFIX_(client), buffers.size(), send_count);
this->triggerWrite();
}
}
void VoiceClientConnection::encrypt_write_queue() {
OutgoingServerPacket* packets_head, *packets_tail;
{
std::lock_guard wlock{this->write_queue_mutex};
packets_head = this->write_queue_head;
this->write_queue_head = nullptr;
this->write_queue_tail = &this->write_queue_head;
}
if(!packets_head) return;
auto packet = packets_head;
while(packet) {
this->prepare_outgoing_packet(packet);
packets_tail = packet;
packet = packet->next;
}
{
std::lock_guard wlock{this->write_queue_mutex};
*this->resend_queue_tail = packets_head;
this->resend_queue_tail = &packets_tail->next;
}
}
bool VoiceClientConnection::wait_empty_write_and_prepare_queue(chrono::time_point<chrono::system_clock> until) {
while(true) {
{
std::lock_guard wlock{this->write_queue_mutex};
if(this->write_queue_head)
goto _wait;
if(this->resend_queue_head)
goto _wait;
}
break;
_wait:
if(until.time_since_epoch().count() != 0 && system_clock::now() > until)
return false;
threads::self::sleep_for(milliseconds(5));
}
return true;
}
void VoiceClientConnection::reset() {
{
std::lock_guard wlock{this->write_queue_mutex};
auto head = this->write_queue_head;
while(head) {
auto next = head->next;
head->unref();
head = next;
}
this->write_queue_head = nullptr;
this->write_queue_tail = &this->write_queue_head;
head = this->resend_queue_head;
while(head) {
auto next = head->next;
head->unref();
head = next;
}
this->resend_queue_head = nullptr;
this->resend_queue_tail = &this->resend_queue_head;
}
this->acknowledge_handler.reset();
this->crypt_handler.reset();
this->packet_id_manager.reset();
{
lock_guard buffer_lock(this->packet_buffer_lock);
for(auto& buffer : this->_command_fragment_buffers)
buffer.reset();
}
}
void VoiceClientConnection::force_insert_command(const pipes::buffer_view &buffer) {
CommandFragment fragment_entry{
0,
0,
PacketFlag::Unencrypted,
(uint32_t) buffer.length(),
buffer.own_buffer()
};
{
auto& fragment_buffer = this->_command_fragment_buffers[command_fragment_buffer_index(protocol::COMMAND)];
unique_lock queue_lock(fragment_buffer.buffer_lock);
fragment_buffer.push_front(std::move(fragment_entry));
break; /* Maybe handle more than one command? Maybe some kind of time limit? */
}
auto voice_server = this->client->voice_server;
@ -608,34 +241,45 @@ void VoiceClientConnection::force_insert_command(const pipes::buffer_view &buffe
voice_server->schedule_command_handling(this->client);
}
void VoiceClientConnection::register_initiv_packet() {
auto& fragment_buffer = this->_command_fragment_buffers[command_fragment_buffer_index(protocol::COMMAND)];
unique_lock buffer_lock(fragment_buffer.buffer_lock);
fragment_buffer.set_full_index_to(1); /* the first packet (0) is already the clientinitiv packet */
bool VoiceClientConnection::wait_empty_write_and_prepare_queue(chrono::time_point<chrono::system_clock> until) {
return this->packet_encoder_.wait_empty_write_and_prepare_queue(until);
}
void VoiceClientConnection::reset() {
this->crypt_handler.reset();
{
std::unique_lock pc_lock{this->pending_commands_lock};
auto head = std::exchange(this->pending_commands_head, nullptr);
this->pending_commands_tail = &this->pending_commands_head;
pc_lock.unlock();
while(head) {
auto cmd = head->next_command;
ReassembledCommand::free(head);
head = cmd;
}
}
this->packet_decoder_.reset();
this->packet_encoder_.reset();
}
void VoiceClientConnection::force_insert_command(const pipes::buffer_view &buffer) {
auto command = ReassembledCommand::allocate(buffer.length());
memcpy(command->command(), buffer.data_ptr(), command->length());
this->enqueue_command_execution(command);
}
void VoiceClientConnection::send_packet(protocol::OutgoingServerPacket *packet) {
uint32_t full_id;
{
std::lock_guard id_lock{this->packet_id_mutex};
full_id = this->packet_id_manager.generate_full_id(packet->packet_type());
}
packet->set_packet_id(full_id & 0xFFFFU);
packet->generation = full_id >> 16U;
{
std::lock_guard qlock{this->write_queue_mutex};
*this->write_queue_tail = packet;
this->write_queue_tail = &packet->next;
}
this->packet_encoder_.send_packet(packet);
auto statistics = this->client ? this->client->connectionStatistics : nullptr;
if(statistics) {
auto category = stats::ConnectionStatistics::category::from_type(packet->packet_type());
statistics->logOutgoingPacket(category, packet->packet_length() + 96); /* 96 for the UDP packet overhead */
}
this->triggerWrite();
}
void VoiceClientConnection::send_packet(protocol::PacketType type, protocol::PacketFlag::PacketFlags flag, const void *payload, size_t payload_size) {
@ -647,128 +291,58 @@ void VoiceClientConnection::send_packet(protocol::PacketType type, protocol::Pac
this->send_packet(packet);
}
#define MAX_COMMAND_PACKET_PAYLOAD_LENGTH (487)
void VoiceClientConnection::send_command(const std::string_view &command, bool low, std::unique_ptr<threads::Future<bool>> ack_listener) {
bool own_data_buffer{false};
void* own_data_buffer_ptr; /* imutable! */
void VoiceClientConnection::send_command(const std::string_view &cmd, bool b, std::unique_ptr<threads::Future<bool>> cb) {
this->packet_encoder_.send_command(cmd, b, std::move(cb));
}
const char* data_buffer{command.data()};
size_t data_length{command.length()};
void VoiceClientConnection::callback_encode_crypt_error(void *ptr_this,
const PacketEncoder::CryptError &error,
const std::string &detail) {
auto connection = reinterpret_cast<VoiceClientConnection*>(ptr_this);
switch (error) {
case PacketEncoder::CryptError::ENCRYPT_FAILED:
logError(connection->client->getServerId(), "{} Failed to encrypt packet. Error: {}", CLIENT_STR_LOG_PREFIX_(connection->client), detail);
break;
uint8_t head_pflags{0};
PacketType ptype{low ? PacketType::COMMAND_LOW : PacketType::COMMAND};
protocol::OutgoingServerPacket *packets_head{nullptr};
protocol::OutgoingServerPacket **packets_tail{&packets_head};
case PacketEncoder::CryptError::KEY_GENERATION_FAILED:
logError(connection->client->getServerId(), "{} Failed to generate crypt key/nonce for sending a packet. This should never happen! Dropping packet.", CLIENT_STR_LOG_PREFIX_(connection->client));
break;
/* only compress "long" commands */
if(command.size() > 100) {
size_t max_compressed_payload_size = compression::qlz_compressed_size(command.data(), command.length());
auto compressed_buffer = ::malloc(max_compressed_payload_size);
size_t compressed_size{max_compressed_payload_size};
if(!compression::qlz_compress_payload(command.data(), command.length(), compressed_buffer, &compressed_size)) {
logCritical(0, "Failed to compress command packet. Dropping packet");
::free(compressed_buffer);
default:
assert(false);
return;
}
/* we don't need to make the command longer than it is */
if(compressed_size < command.length() || this->client->getType() == ClientType::CLIENT_TEAMSPEAK) { /* TS3 requires each splituped packet to be compressed (Update: Not 100% sure since there was another bug when discovering this but I've kept it since) */
own_data_buffer = true;
data_buffer = (char*) compressed_buffer;
own_data_buffer_ptr = compressed_buffer;
data_length = compressed_size;
head_pflags |= PacketFlag::Compressed;
} else {
::free(compressed_buffer);
}
}
}
uint8_t ptype_and_flags{(uint8_t) ((uint8_t) ptype | (uint8_t) PacketFlag::NewProtocol)};
if(data_length > MAX_COMMAND_PACKET_PAYLOAD_LENGTH) {
auto chunk_count = (size_t) ceil((float) data_length / (float) MAX_COMMAND_PACKET_PAYLOAD_LENGTH);
auto chunk_size = (size_t) ceil((float) data_length / (float) chunk_count);
void VoiceClientConnection::callback_request_write(void *ptr_this) {
auto connection = reinterpret_cast<VoiceClientConnection*>(ptr_this);
connection->triggerWrite();
}
while(true) {
auto bytes = min(chunk_size, data_length);
auto packet = protocol::allocate_outgoing_packet(bytes);
packet->type_and_flags = ptype_and_flags;
memcpy(packet->payload, data_buffer, bytes);
void VoiceClientConnection::callback_resend_failed(void *ptr_this, const shared_ptr<AcknowledgeManager::Entry> &entry) {
auto connection = reinterpret_cast<VoiceClientConnection*>(ptr_this);
*packets_tail = packet;
packets_tail = &packet->next;
debugMessage(connection->client->getServerId(), "{} Failed to execute packet resend of packet {}. Dropping connection.", CLIENT_STR_LOG_PREFIX_(connection->client), entry->packet_full_id);
data_length -= bytes;
if(data_length == 0) {
packet->type_and_flags |= PacketFlag::Fragmented;
break;
}
data_buffer += bytes;
}
packets_head->type_and_flags |= PacketFlag::Fragmented;
if(connection->client->state == ConnectionState::CONNECTED) {
connection->client->disconnect(ViewReasonId::VREASON_TIMEOUT, config::messages::timeout::packet_resend_failed, nullptr, true);
} else {
auto packet = protocol::allocate_outgoing_packet(data_length);
packet->type_and_flags = ptype_and_flags;
memcpy(packet->payload, data_buffer, data_length);
packets_head = packet;
packets_tail = &packet->next;
connection->client->close_connection(system_clock::now() + seconds(1));
}
}
void VoiceClientConnection::callback_resend_statistics(void *ptr_this, size_t send_count) {
auto connection = reinterpret_cast<VoiceClientConnection*>(ptr_this);
{
std::lock_guard id_lock{this->packet_id_mutex};
uint32_t full_id;
auto head = packets_head;
while(head) {
full_id = this->packet_id_manager.generate_full_id(ptype);
logTrace(connection->client->getServerId(), "{} Resending {} packets.", CLIENT_STR_LOG_PREFIX_(connection->client), send_count);
}
head->set_packet_id(full_id & 0xFFFFU);
head->generation = full_id >> 16U;
head = head->next;
}
}
packets_head->type_and_flags |= head_pflags;
void VoiceClientConnection::callback_outgoing_connection_statistics(void *ptr_this,
ts::stats::ConnectionStatistics::category::value category,
size_t send_count) {
auto connection = reinterpret_cast<VoiceClientConnection*>(ptr_this);
auto statistics = connection->client->connectionStatistics;
if(!statistics) return;
/* do this before the next ptr might get modified due to the write queue */
auto statistics = this->client ? this->client->connectionStatistics : nullptr;
/* general stats */
if(statistics) {
auto head = packets_head;
while(head) {
statistics->logOutgoingPacket(stats::ConnectionStatistics::category::COMMAND, head->packet_length() + 96); /* 96 for the UDP overhead */
head = head->next;
}
}
/* loss stats */
{
auto head = packets_head;
while(head) {
auto full_packet_id = (uint32_t) (head->generation << 16U) | head->packet_id();
this->packet_statistics_.send_command(head->packet_type(), full_packet_id);
/* increase a reference for the ack handler */
head->ref();
/* Even thou the packet is yet unencrypted, it will be encrypted with the next write. The next write will be before the next resend because the next ptr must be null in order to resend a packet */
if(&head->next == packets_tail)
this->acknowledge_handler.process_packet(ptype, full_packet_id, head, std::move(ack_listener));
else
this->acknowledge_handler.process_packet(ptype, full_packet_id, head, nullptr);
head = head->next;
}
}
{
std::lock_guard qlock{this->write_queue_mutex};
*this->write_queue_tail = packets_head;
this->write_queue_tail = packets_tail;
}
this->triggerWrite();
if(own_data_buffer)
::free(own_data_buffer_ptr);
statistics->logOutgoingPacket(category, send_count);
}

View File

@ -16,6 +16,8 @@
#include "protocol/AcknowledgeManager.h"
#include <protocol/generation.h>
#include "./PacketStatistics.h"
#include "./PacketDecoder.h"
#include "./PacketEncoder.h"
//#define LOG_ACK_SYSTEM
#ifdef LOG_ACK_SYSTEM
@ -38,33 +40,12 @@ namespace ts {
friend class server::VoiceServer;
friend class server::VoiceClient;
friend class server::POWHandler;
using PacketDecoder = server::server::udp::PacketDecoder;
using PacketEncoder = server::server::udp::PacketEncoder;
using ReassembledCommand = server::server::udp::ReassembledCommand;
using StatisticsCategory = stats::ConnectionStatistics::category;
public:
enum struct WBufferPopResult {
DRAINED,
MORE_AVAILABLE
};
struct CommandFragment {
uint16_t packet_id{0};
uint16_t packet_generation{0};
uint8_t packet_flags{0};
uint32_t payload_length : 24;
pipes::buffer payload{};
CommandFragment() { this->payload_length = 0; }
CommandFragment(uint16_t packetId, uint16_t packetGeneration, uint8_t packetFlags, uint32_t payloadLength, pipes::buffer payload) : packet_id{packetId}, packet_generation{packetGeneration}, packet_flags{packetFlags},
payload_length{payloadLength}, payload{std::move(payload)} {}
CommandFragment& operator=(const CommandFragment&) = default;
CommandFragment(const CommandFragment& other) = default;
CommandFragment(CommandFragment&&) = default;
};
static_assert(sizeof(CommandFragment) == 8 + sizeof(pipes::buffer));
typedef protocol::PacketRingBuffer<CommandFragment, 32, CommandFragment> command_fragment_buffer_t;
typedef std::array<command_fragment_buffer_t, 2> command_packet_reassembler;
explicit VoiceClientConnection(server::VoiceClient*);
virtual ~VoiceClientConnection();
@ -77,26 +58,15 @@ namespace ts {
server::VoiceClient* getClient(){ return client; }
#ifdef VC_USE_READ_QUEUE
bool handleNextDatagram();
#endif
/* if the result is true, ownership has been transferred */
WBufferPopResult pop_write_buffer(protocol::OutgoingServerPacket*& /* packet */);
void execute_resend(const std::chrono::system_clock::time_point &now, std::chrono::system_clock::time_point &next);
void encrypt_write_queue();
bool wait_empty_write_and_prepare_queue(std::chrono::time_point<std::chrono::system_clock> until = std::chrono::time_point<std::chrono::system_clock>());
protocol::PacketIdManager& getPacketIdManager() { return this->packet_id_manager; }
AcknowledgeManager& getAcknowledgeManager() { return this->acknowledge_handler; }
inline auto& get_incoming_generation_estimators() { return this->incoming_generation_estimators; }
void reset();
void force_insert_command(const pipes::buffer_view& /* payload */);
void register_initiv_packet();
[[nodiscard]] inline auto& packet_statistics() { return this->packet_statistics_; }
//buffer::SortedBufferQueue<protocol::ClientPacket>** getReadQueue() { return this->readTypedQueue; }
[[nodiscard]] inline auto& packet_decoder() { return this->packet_decoder_; }
[[nodiscard]] inline auto& packet_encoder() { return this->packet_encoder_; }
protected:
void handle_incoming_datagram(const pipes::buffer_view &buffer);
bool verify_encryption(const pipes::buffer_view& /* full packet */);
@ -105,44 +75,28 @@ namespace ts {
private:
server::VoiceClient* client = nullptr;
//Decryption / encryption stuff
CryptHandler crypt_handler; /* access to CryptHandler is thread save */
CompressionHandler compress_handler;
AcknowledgeManager acknowledge_handler;
std::atomic_bool should_reassembled_reschedule{}; /* this get checked as soon the command handle lock has been released so trylock will succeed */
//Handle stuff
void execute_handle_command_packets(const std::chrono::system_clock::time_point& /* scheduled */);
bool next_reassembled_command(std::unique_lock<std::recursive_timed_mutex> &buffer_execute_lock /* packet channel execute lock */, pipes::buffer & /* buffer*/, uint16_t& /* packet id */);
/* ---------- Write ---------- */
spin_mutex write_queue_mutex{};
protocol::OutgoingServerPacket* resend_queue_head{nullptr};
protocol::OutgoingServerPacket** resend_queue_tail{&resend_queue_head};
protocol::OutgoingServerPacket* write_queue_head{nullptr};
protocol::OutgoingServerPacket** write_queue_tail{&write_queue_head};
/* ---------- Processing ---------- */
/* automatically locked because packets of the same kind should be lock their "work_lock" from their WritePreprocessQueue object */
protocol::PacketIdManager packet_id_manager;
spin_mutex packet_id_mutex{};
/* this function is thread save :) */
std::atomic<uint8_t> prepare_process_count{0}; /* current thread count preparing a packet */
bool prepare_outgoing_packet(protocol::OutgoingServerPacket* /* packet */);
std::array<protocol::generation_estimator, 9> incoming_generation_estimators{}; /* implementation is thread save */
std::recursive_mutex packet_buffer_lock;
command_packet_reassembler _command_fragment_buffers;
static inline uint8_t command_fragment_buffer_index(uint8_t packet_index) {
return packet_index & 0x1U; /* use 0 for command and 1 for command low */
}
server::client::PacketStatistics packet_statistics_{};
PacketDecoder packet_decoder_;
PacketEncoder packet_encoder_;
spin_mutex pending_commands_lock{};
ReassembledCommand* pending_commands_head{nullptr};
ReassembledCommand** pending_commands_tail{&pending_commands_head};
bool has_command_handling_scheduled{false}; /* locked by pending_commands_lock */
void enqueue_command_execution(ReassembledCommand*);
void execute_handle_command_packets(const std::chrono::system_clock::time_point& /* scheduled */);
static void callback_packet_decoded(void*, const protocol::ClientPacketParser&);
static void callback_command_decoded(void*, ReassembledCommand*&);
static void callback_send_acknowledge(void*, uint16_t, bool);
static void callback_request_write(void*);
static void callback_encode_crypt_error(void*, const PacketEncoder::CryptError&, const std::string&);
static void callback_resend_failed(void*, const std::shared_ptr<AcknowledgeManager::Entry>&);
static void callback_resend_statistics(void*, size_t);
static void callback_outgoing_connection_statistics(void*, StatisticsCategory::value, size_t /* bytes */);
};
}
}

View File

@ -1,6 +1,5 @@
#include <tommath.h>
#include <tomcrypt.h>
#include <misc/endianness.h>
#include <misc/digest.h>
#include <misc/base64.h>
#include <openssl/sha.h>
@ -9,7 +8,6 @@
#include "VoiceClient.h"
#include "../../InstanceHandler.h"
#include "../../server/VoiceServer.h"
using namespace std;
using namespace std::chrono;
@ -69,7 +67,7 @@ ts::command_result VoiceClient::handleCommandClientInitIv(Command& command) {
state_lock.unlock();
this->connection->reset();
this->connection->register_initiv_packet();
this->connection->packet_decoder().register_initiv_packet();
this->connection->packet_statistics().reset_offsets();
this->crypto.protocol_encrypted = false;
@ -168,7 +166,7 @@ ts::command_result VoiceClient::handleCommandClientInitIv(Command& command) {
this->handshake.state = HandshakeState::SUCCEEDED; /* we're doing the verify via TeamSpeak */
}
this->sendCommand0(initivexpand.build()); //If we setup the encryption now
this->connection->encrypt_write_queue();
this->connection->packet_encoder().encrypt_pending_packets();
}
{
@ -191,7 +189,8 @@ ts::command_result VoiceClient::handleCommandClientEk(Command& cmd) {
auto private_key = this->crypto.chain_data->chain->generatePrivateKey(this->crypto.chain_data->root_key, this->crypto.chain_data->root_index);
this->connection->getCryptHandler()->setupSharedSecretNew(this->crypto.alpha, this->crypto.beta, (char*) private_key.data(), client_key.data());
this->connection->acknowledge_handler.reset();
this->connection->packet_encoder().acknowledge_manager().reset();
this->crypto.protocol_encrypted = true;
this->sendAcknowledge(1); //Send the encrypted acknowledge (most the times the second packet; If not we're going into the resend loop)
return ts::command_result{error::ok};

View File

@ -85,6 +85,6 @@ void VoiceClient::handlePacketAck(const protocol::ClientPacketParser& packet) {
this->connection->packet_statistics().received_acknowledge((protocol::PacketType) packet.type(), target_id | (packet.estimated_generation() << 16U));
string error{};
if(!this->connection->acknowledge_handler.process_acknowledge(packet.type(), target_id, error))
if(!this->connection->packet_encoder().acknowledge_manager().process_acknowledge(packet.type(), target_id, error))
debugMessage(this->getServerId(), "{} Failed to handle acknowledge: {}", CLIENT_STR_LOG_PREFIX, error);
}

View File

@ -472,7 +472,7 @@ namespace ts::server::log {
void log_permission_add_value(
ServerId /* server id */,
const std::shared_ptr<ConnectedClient>& /* editor (may be null) */,
server::log::PermissionTarget /* target */,
ts::server::log::PermissionTarget /* target */,
uint64_t /* id1 */, const std::string& /* id1 name */,
uint64_t /* id2 */, const std::string& /* id2 name */,
const permission::PermissionTypeEntry& /* permission */,
@ -483,7 +483,7 @@ namespace ts::server::log {
void log_permission_add_grant(
ServerId /* server id */,
const std::shared_ptr<ConnectedClient>& /* editor (may be null) */,
server::log::PermissionTarget /* target */,
ts::server::log::PermissionTarget /* target */,
uint64_t /* id1 */, const std::string& /* id1 name */,
uint64_t /* id2 */, const std::string& /* id2 name */,
const permission::PermissionTypeEntry& /* permission */,
@ -492,7 +492,7 @@ namespace ts::server::log {
void log_permission_edit_value(
ServerId /* server id */,
const std::shared_ptr<ConnectedClient>& /* editor (may be null) */,
server::log::PermissionTarget /* target */,
ts::server::log::PermissionTarget /* target */,
uint64_t /* id1 */, const std::string& /* id1 name */,
uint64_t /* id2 */, const std::string& /* id2 name */,
const permission::PermissionTypeEntry& /* permission */,
@ -506,7 +506,7 @@ namespace ts::server::log {
void log_permission_edit_grant(
ServerId /* server id */,
const std::shared_ptr<ConnectedClient>& /* editor (may be null) */,
server::log::PermissionTarget /* target */,
ts::server::log::PermissionTarget /* target */,
uint64_t /* id1 */, const std::string& /* id1 name */,
uint64_t /* id2 */, const std::string& /* id2 name */,
const permission::PermissionTypeEntry& /* permission */,
@ -516,7 +516,7 @@ namespace ts::server::log {
void log_permission_remove_value(
ServerId /* server id */,
const std::shared_ptr<ConnectedClient>& /* editor (may be null) */,
server::log::PermissionTarget /* target */,
ts::server::log::PermissionTarget /* target */,
uint64_t /* id1 */, const std::string& /* id1 name */,
uint64_t /* id2 */, const std::string& /* id2 name */,
const permission::PermissionTypeEntry& /* permission */,
@ -527,7 +527,7 @@ namespace ts::server::log {
void log_permission_remove_grant(
ServerId /* server id */,
const std::shared_ptr<ConnectedClient>& /* editor (may be null) */,
server::log::PermissionTarget /* target */,
ts::server::log::PermissionTarget /* target */,
uint64_t /* id1 */, const std::string& /* id1 name */,
uint64_t /* id2 */, const std::string& /* id2 name */,
const permission::PermissionTypeEntry& /* permission */,

View File

@ -3,7 +3,7 @@
#include <mutex>
#include <netinet/in.h>
#include <pipes/buffer.h>
#include <src/client/voice/PrecomputedPuzzles.h>
#include <src/server/PrecomputedPuzzles.h>
#include <Definitions.h>
#include "VoiceServer.h"
#include "src/VirtualServer.h"

View File

@ -152,7 +152,7 @@ void VoiceServer::execute_resend(const std::chrono::system_clock::time_point &no
for(const auto& client : connections) {
auto connection = client->getConnection();
sassert(connection); /* its not possible that a client hasn't a connection! */
connection->execute_resend(now, next);
connection->packet_encoder().execute_resend(now, next);
}
}
@ -425,7 +425,7 @@ inline ssize_t write_datagram(IOData<MHS>& io, const sockaddr_storage& address,
}
void VoiceServer::handleMessageWrite(int fd, short events, void *_event_handle) {
using WBufferPopResult = connection::VoiceClientConnection::WBufferPopResult;
using WBufferPopResult = server::udp::PacketEncoder::BufferPopResult;
auto event_handle = (io::IOEventLoopEntry*) _event_handle;
auto voice_server = event_handle->voice_server;
@ -453,7 +453,7 @@ void VoiceServer::handleMessageWrite(int fd, short events, void *_event_handle)
while(system_clock::now() <= write_timeout) {
packet = nullptr;
client_wbuffer_state = client->connection->pop_write_buffer(packet);
client_wbuffer_state = client->connection->packet_encoder().pop_write_buffer(packet);
if(!packet) {
assert(client_wbuffer_state == WBufferPopResult::DRAINED);
break;

2
shared

@ -1 +1 @@
Subproject commit b1f5620760351535a825ec3e618150c001b85799
Subproject commit a15eb9d25c5b3305bf7b06d6780e9dbf7d7f3bc0