TeaSpeak-Client/native/serverconnection/src/connection/ProtocolHandler.cpp

635 lines
27 KiB
C++

#ifdef WIN32
#include <WinSock2.h>
#endif
#include "ProtocolHandler.h"
#include "ServerConnection.h"
#include "Socket.h"
#include "../logger.h"
#include <misc/base64.h>
#include <misc/endianness.h>
#include <protocol/buffers.h>
#include <thread>
#include <iostream>
#include <protocol/Packet.h>
using namespace std;
using namespace std::chrono;
using namespace tc::connection;
using namespace ts::protocol;
using namespace ts;
ProtocolHandler::ProtocolHandler(ServerConnection* handle) : handle(handle) {
this->compression_handler.max_packet_size = 128 * 1024; /* max 128Kb */
}
ProtocolHandler::~ProtocolHandler() {
}
void ProtocolHandler::reset() {
this->server_type = server_type::UNKNOWN;
this->disconnect_id++; /* we've been resetted any pending disconnects are not from interest anymore */
this->client_id = 0;
this->acknowledge_handler.reset();
this->connection_state = connection_state::INITIALIZING;
{ /* initialize pow handler */
this->pow.state = pow_state::COOKIE_SET;
this->pow.retry_count = 0;
this->pow.last_buffer = pipes::buffer{};
this->pow.last_resend = system_clock::time_point{};
this->pow.last_response = system_clock::time_point{};
this->pow.client_control_data[0] = 0; /* clear set flag, so the client generates a new pack */
}
{
this->crypto.alpha[0] = 0;
this->crypto.initiv_command = "";
this->crypto.beta_length = 0;
if(this->crypto.identity.k)
ecc_free(&this->crypto.identity);
memset(&this->crypto.identity, 0, sizeof(this->crypto.identity));
}
for(auto& buffer : this->_packet_buffers) {
lock_guard lock(buffer.buffer_lock);
buffer.reset();
}
this->crypt_setupped = false;
for(auto& calculator : this->incoming_generation_estimators)
calculator.reset();
this->_packet_id_manager.reset();
this->crypt_handler.reset();
this->ping.ping_received_timestamp = system_clock::time_point{};
}
void ProtocolHandler::connect() {
this->connection_state = connection_state::INIT_LOW;
this->connect_timestamp = system_clock::now();
this->pow_send_cookie_get();
{
auto command = this->generate_client_initiv();
auto packet = make_shared<ClientPacket>(PacketTypeInfo::Command, pipes::buffer_view{command.data(), command.size()});
packet->enable_flag(PacketFlag::NewProtocol);
this->send_packet(packet);
}
}
void ProtocolHandler::execute_tick() {
auto now = system_clock::now();
if(this->connection_state < connection_state::DISCONNECTED) {
if(!this->pow.last_buffer.empty() && this->pow.last_resend < now - seconds(1)) {
this->pow.last_resend = now;
this->send_packet(make_shared<ClientPacket>(PacketTypeInfo::Init1, PacketFlag::Unencrypted, this->pow.last_buffer));
}
if(this->connection_state == connection_state::INIT_LOW || this->connection_state == connection_state::INIT_HIGH) {
if(this->connect_timestamp < now - seconds(15)) {
this->handle->call_connect_result.call(this->handle->errors.register_error("timeout (" + to_string(this->connection_state) + ")"), true);
this->handle->close_connection();
return;
}
}
if(this->connection_state == connection_state::DISCONNECTING) {
if(this->disconnect_timestamp < now - seconds(5)) { /* disconnect timeout */
this->handle->close_connection();
return;
}
}
this->execute_resend();
/* ping */
if(this->connection_state == connection_state::CONNECTED) {
if(this->ping.ping_send_timestamp + seconds(1) < now)
this->ping_send_request();
if(this->ping.ping_received_timestamp.time_since_epoch().count() > 0) {
if(now - this->ping.ping_received_timestamp > seconds(30)) {
this->handle->execute_callback_disconnect.call(tr("ping timeout"), true);
this->handle->close_connection();
return;
}
} else
this->ping.ping_received_timestamp = now;
}
}
}
void ProtocolHandler::execute_resend() {
if(this->connection_state >= connection_state::DISCONNECTED)
return;
std::deque<std::shared_ptr<ts::connection::AcknowledgeManager::Entry>> buffers;
auto now = system_clock::now();
system_clock::time_point next = now + seconds(5); /* in real we're doing it all 500ms */
string error;
auto resended = this->acknowledge_handler.execute_resend(now, next, buffers, error);
if(resended < 0) {
log_error(category::connection, tr("Failed to receive acknowledge: {}"), error);
this->handle->execute_callback_disconnect(tr("packet resend failed"));
this->handle->close_connection();
return;
}
//log_trace(category::connection, tr("Resended {}"), resended);
auto socket = this->handle->get_socket();
if(socket) {
for(const auto& buffer : buffers)
socket->send_message(buffer->buffer);
}
this->handle->schedule_resend(next);
}
void ProtocolHandler::progress_packet(const pipes::buffer_view &buffer) {
if(this->connection_state >= connection_state::DISCONNECTED) {
log_warn(category::connection, tr("Dropping received packet. We're already disconnected."));
return;
}
if(buffer.length() < ServerPacket::META_SIZE) {
log_error(category::connection, tr("Received a packet which is too small. ({})"), buffer.length());
return;
}
auto packet = std::shared_ptr<ts::protocol::ServerPacket>(ts::protocol::ServerPacket::from_buffer(buffer).release());
auto packet_type = packet->type();
auto packet_id = packet->packetId();
auto ordered = packet_type.type() == protocol::COMMAND || packet_type.type() == protocol::COMMAND_LOW;
//log_trace(category::connection, tr("Received packet {} with id {}"), packet->type().name(), packet->packetId());
/*
if(ordered)
if(rand() & 1)
return;
*/
/* special handling */
if(packet_type.type() == protocol::INIT1) {
this->handlePacketInit(packet);
return;
}
if(packet_type.type() < 0 || packet_type.type() >= this->_packet_buffers.size()) {
log_error(category::connection, tr("Received packet with invalid type. ({})"), packet_type.type());
return;
}
auto& read_queue = this->_packet_buffers[packet_type.type()];
auto& gen_calc = this->incoming_generation_estimators[packet_type.type()];
packet->generationId(gen_calc.visit_packet(packet_id));
auto gen = packet->generationId();
if(ordered) {
unique_lock queue_lock(read_queue.buffer_lock);
auto result = read_queue.accept_index(packet_id);
if(result != 0) { /* packet index is ahead buffer index */
log_error(category::connection, tr("Failed to register command packet ({}) (Index: {} Current index: {})"), result == -1 ? tr("underflow") : tr("overflow"), packet_id, read_queue.current_index());
if(result == -1) { /* underflow */
/* we've already got the packet, but the client dosn't know that so we've to send the acknowledge again */
if(packet->type() == PacketTypeInfo::Command || packet->type() == PacketTypeInfo::CommandLow)
this->send_acknowledge(packet->packetId(), packet->type() == PacketTypeInfo::CommandLow);
}
return;
}
}
packet->setEncrypted(!packet->has_flag(PacketFlag::Unencrypted));
if(packet->type() == PacketTypeInfo::Command || packet->type() == PacketTypeInfo::CommandLow){
packet->setCompressed(packet->has_flag(PacketFlag::Compressed));
}
//NOTICE I found out that the Compressed flag is set if the packet contains an audio header
if(packet->isEncrypted()) {
std::string error;
ts::connection::CryptHandler::key_t crypt_key{};
ts::connection::CryptHandler::nonce_t crypt_nonce{};
bool decrypt_result;
if(!this->crypt_setupped) {
crypt_key = ts::connection::CryptHandler::default_key;
crypt_nonce = ts::connection::CryptHandler::default_nonce;
} else {
if(!this->crypt_handler.generate_key_nonce(false, packet_type.type(), packet->packetId(), packet->generationId(), crypt_key, crypt_nonce)) {
log_error(category::connection, tr("Failed to generate crypt key/nonce. This should never happen! Dropping packet."));
return;
}
}
auto mac_ptr = packet->mac().data_ptr<void>();
auto header_ptr = packet->header().data_ptr<void>();
auto data_ptr = packet->data().data_ptr<void>();
decrypt_result = this->crypt_handler.decrypt(
header_ptr, packet->header_length(),
data_ptr, packet->data_length(),
mac_ptr,
crypt_key, crypt_nonce,
error
);
if(!decrypt_result) {
if(!this->crypt_setupped)
log_error(category::connection, tr("Failed to decrypt packet ({}), with default key."), packet_type.name());
else
log_trace(category::connection, tr("Failed to decrypt packet {}."), packet_type.name());
return;
}
}
if(packet->type() == PacketTypeInfo::Command || packet->type() == PacketTypeInfo::CommandLow){
if(packet->has_flag(PacketFlag::Unencrypted)) {
log_warn(category::connection, tr("Received unencrypted command packet! Dropping packet."));
return;
}
}
{
unique_lock queue_lock(read_queue.buffer_lock);
if(ordered) { /* ordered */
//log_trace(category::connection, tr("Inserting packet {} with id {}"), packet->type().name(), packet->packetId());
if(!read_queue.insert_index(packet_id, std::forward<shared_ptr<ServerPacket>>(packet))) {
log_warn(category::connection, tr("Failed to insert ordered packet into queue. ({} | {} | {})"), packet_type.name(), read_queue.current_index(), packet_id);
return;
}
} else {
if(!read_queue.push_back(std::forward<shared_ptr<ServerPacket>>(packet))) {
log_warn(category::connection, tr("Failed to insert unordered packet into queue. ({} | {} | {})"), packet_type.name(), read_queue.current_index(), packet_id);
return;
/* return; dont stop here because we've to progress the packets */
} else {
read_queue.index_set(packet_id); /* may we've skipped one packet id */
}
}
}
/* only send an ack when we actually succeeded registering the packet */
if(packet_type == PacketTypeInfo::Command || packet_type == PacketTypeInfo::CommandLow)
this->send_acknowledge(packet_id, packet_type == PacketTypeInfo::CommandLow);
while(this->handle_packets());
}
bool ProtocolHandler::handle_packets() {
if(this->connection_state >= connection_state::DISCONNECTED) {
log_warn(category::connection, tr("Don't handle received packets because we're already disconnected."));
return false;
}
bool reexecute_handle = false;
shared_ptr<ServerPacket> current_packet = nullptr;
packet_buffer_t* buffer = nullptr;
unique_lock<std::recursive_timed_mutex> buffer_lock;
unique_lock<std::recursive_timed_mutex> buffer_execute_lock;
std::string error = "success";
{
auto base_index = this->_packet_buffers_index;
auto select_index = base_index;
auto max_index = this->_packet_buffers.size();
for(size_t index = 0; index < max_index; index++) {
if(!buffer) select_index++;
auto& buf = this->_packet_buffers[base_index++ % max_index];
unique_lock ring_lock(buf.buffer_lock, try_to_lock);
if(!ring_lock.owns_lock()) {
log_debug(category::connection, tr("Skipping packet type {} for handling"), base_index++ % max_index);
continue;
}
if(buf.front_set()) {
if(!buffer) { /* lets still test for reexecute */
buffer_execute_lock = unique_lock(buf.execute_lock, try_to_lock);
if(!buffer_execute_lock.owns_lock()) {
log_debug(category::connection, tr("Skipping packet type {} for handling (already executed)"), base_index++ % max_index);
continue;
}
buffer_lock = move(ring_lock);
buffer = &buf;
} else {
reexecute_handle |= true;
break;
}
}
}
this->_packet_buffers_index = select_index % max_index; /* garante that we will not hangup with commands! */
}
if(buffer){
uint16_t sequence_length = 0;
current_packet = buffer->slot_value(sequence_length++);
if(current_packet) {
if(this->_packet_buffer_overflow[current_packet->type().type()]) {
auto& overflow_flag = this->_packet_buffer_overflow[current_packet->type().type()];
while(current_packet && !current_packet->has_flag(PacketFlag::Fragmented) && sequence_length < buffer->capacity())
current_packet = buffer->slot_value(sequence_length++);
while(buffer->front_set())
if(buffer->pop_front() == current_packet)
break;
overflow_flag = !current_packet || !current_packet->has_flag(PacketFlag::Fragmented);
if(!overflow_flag) {
log_info(category::connection, tr("Recovered successfully from too long packet."));
}
return false;
}
if((current_packet->type() == PacketTypeInfo::Command || current_packet->type() == PacketTypeInfo::CommandLow) && current_packet->has_flag(PacketFlag::Fragmented)) {
do {
if(sequence_length >= buffer->capacity()) {
log_error(category::connection, tr("Received fragmented packets which have a too long order (> {}). Ignoring that command and dropping its segments."), buffer->capacity());
while(buffer->front_set())
buffer->pop_front();
//TODO: Log to the client!
//this->handle->execute_callback_disconnect.call(tr("received a too long packet"), true);
//this->disconnect("received a too long packet");
this->_packet_buffer_overflow[current_packet->type().type()] = true;
return false;
}
current_packet = buffer->slot_value(sequence_length++);
} while(current_packet && !current_packet->has_flag(PacketFlag::Fragmented));
}
} else {
log_critical(category::connection, tr("buffer->slot_value(sequence_length++) returned nullptr!"));
//FIXME!
//logCritical(this->client->getServer()->getServerId(), "buffer->slot_value(sequence_length++) returned nullptr!")
};
if(current_packet) { //We could reconstruct a new packet!
if(sequence_length > 1) { //We have to merge
vector<pipes::buffer> append;
append.reserve(sequence_length - 1);
uint16_t packet_count = 0;
current_packet = buffer->pop_front();
packet_count++;
do {
auto packet = buffer->pop_front();
packet_count++;
if(!packet) {
log_critical(category::connection, tr("readQueue->peekNext(seqIndex++) => nullptr_t!"));
return false;
}
append.push_back(packet->data());
if(packet->has_flag(PacketFlag::Fragmented)) break;
} while(packet_count < sequence_length);
if(packet_count != sequence_length) {
log_critical(category::connection, tr("seqIndex != index failed! seqIndex: {} seqLength: {} This may cause a application crash!"), packet_count, sequence_length);
sequence_length = packet_count;
current_packet = nullptr;
} else {
current_packet->append_data(append);
}
} else {
if(buffer->pop_front() != current_packet) {
log_critical(category::connection, tr("buffer->pop_front() != current_packet failed."));
}
}
reexecute_handle |= buffer->front_set();
buffer_lock.unlock(); //We got our packet so release it
if(current_packet) {
if(!this->compression_handler.progressPacketIn(current_packet.get(), error)) {
log_error(category::connection, tr("Failed to decompress received packet. Error: {}"), error);
current_packet = nullptr;
}
}
}
}
if(current_packet){
auto startTime = chrono::system_clock::now();
try {
if(current_packet->type() == PacketTypeInfo::Command || current_packet->type() == PacketTypeInfo::CommandLow)
this->handlePacketCommand(current_packet);
else if(current_packet->type() == PacketTypeInfo::Ack || current_packet->type() == PacketTypeInfo::AckLow)
this->handlePacketAck(current_packet);
else if(current_packet->type() == PacketTypeInfo::Voice || current_packet->type() == PacketTypeInfo::VoiceWhisper)
this->handlePacketVoice(current_packet);
else if(current_packet->type() == PacketTypeInfo::Ping || current_packet->type() == PacketTypeInfo::Pong)
this->handlePacketPing(current_packet);
} catch (std::exception& ex) {
log_critical(category::connection, tr("Exception reached root tree! {}"), ex.what());
}
auto end = chrono::system_clock::now();
if(end - startTime > chrono::milliseconds(5)) {
if(current_packet->type() != PacketTypeInfo::Command && current_packet->type() != PacketTypeInfo::CommandLow) {
log_warn(category::connection,
tr("Handling of packet {} ({}) needed longer than expected. Handle time {}ms"),
current_packet->packetId(), current_packet->type().name(), duration_cast<milliseconds>(end - startTime).count());
}
}
}
if(buffer_execute_lock.owns_lock())
buffer_execute_lock.unlock();
return reexecute_handle;
}
bool ProtocolHandler::create_datagram_packets(std::vector<pipes::buffer> &result, const std::shared_ptr<ts::protocol::ClientPacket> &packet) {
string error = "success";
if(packet->type().compressable() && !packet->memory_state.fragment_entry) {
packet->enable_flag(PacketFlag::Compressed);
if(!this->compression_handler.progressPacketOut(packet.get(), error)) {
log_error(category::connection, tr("Could not compress outgoing packet.\nThis could cause fatal failed for the client.\nError: {}"), error);
return false;
}
}
if(packet->data().length() > packet->type().max_length()){
if(!packet->type().fragmentable()) {
log_error(category::connection, tr("We've tried to send a too long, not fragmentable packet. Dropping packet of type {} with length {}"), packet->type().name(), packet->data().length());
return false;
}
std::vector<shared_ptr<ClientPacket>> siblings;
siblings.reserve(8);
{ //Split packets
auto buffer = packet->data();
const auto max_length = packet->type().max_length();
while(buffer.length() > max_length * 2) {
siblings.push_back(make_shared<ClientPacket>(packet->type(), buffer.view(0, max_length).dup(ts::buffer::allocate_buffer(max_length))));
buffer = buffer.range(max_length);
}
if(buffer.length() > max_length) { //Divide rest by 2
siblings.push_back(make_shared<ClientPacket>(packet->type(), buffer.view(0, buffer.length() / 2).dup(ts::buffer::allocate_buffer(buffer.length() / 2))));
buffer = buffer.range(buffer.length() / 2);
}
siblings.push_back(make_shared<ClientPacket>(packet->type(), buffer));
for(const auto& frag : siblings) {
frag->setFragmentedEntry(true);
frag->enable_flag(PacketFlag::NewProtocol);
}
}
assert(siblings.size() >= 2);
siblings.front()->enable_flag(PacketFlag::Fragmented);
if(packet->has_flag(PacketFlag::Compressed))
siblings.front()->enable_flag(PacketFlag::Compressed);
siblings.back()->enable_flag(PacketFlag::Fragmented);
if(packet->getListener())
siblings.back()->setListener(std::move(packet->getListener())); //Move the listener to the last :)
result.reserve(siblings.size());
for(const auto& frag : siblings)
create_datagram_packets(result, frag);
return true;
}
if(!packet->memory_state.id_branded) {
packet->clientId(this->client_id);
if(packet->type().type() == PacketType::INIT1) {
packet->applyPacketId(101, 0);
} else {
packet->applyPacketId(this->_packet_id_manager);
}
//log_trace(category::connection, tr("Packet {} got packet id {}"), packet->type().name(), packet->packetId());
}
if(packet->has_flag(PacketFlag::Unencrypted)) {
this->crypt_handler.write_default_mac(packet->mac().data_ptr());
} else {
ts::connection::CryptHandler::key_t crypt_key{};
ts::connection::CryptHandler::nonce_t crypt_nonce{};
if(!this->crypt_setupped) {
crypt_key = ts::connection::CryptHandler::default_key;
crypt_nonce = ts::connection::CryptHandler::default_nonce;
} else {
if(!this->crypt_handler.generate_key_nonce(true, packet->type().type(), packet->packetId(), packet->generationId(), crypt_key, crypt_nonce)) {
log_error(category::connection, tr("Failed to generate crypt key/nonce. Dropping packet"), error);
return false;
}
}
auto crypt_result = this->crypt_handler.encrypt(packet->header().data_ptr(), packet->header().length(),
packet->data().data_ptr(), packet->data().length(),
packet->mac().data_ptr(),
crypt_key, crypt_nonce, error);
if(!crypt_result){
log_error(category::connection, tr("Failed to encrypt packet: {}"), error);
return false;
}
}
/*
#ifndef CONNECTION_NO_STATISTICS
if(this->client && this->client->getServer())
this->client->connectionStatistics->logOutgoingPacket(packet);
#endif
*/
result.push_back(packet->buffer());
this->acknowledge_handler.process_packet(*packet);
return true;
}
void ProtocolHandler::send_command(const ts::Command &cmd, const std::function<void(bool)> &ack_callback) {
auto data = cmd.build();
auto packet = make_shared<ClientPacket>(PacketTypeInfo::Command, pipes::buffer_view{data.data(), data.size()});
if(ack_callback || true) {
auto begin = chrono::system_clock::now();
packet->setListener(make_unique<threads::Future<bool>>());
packet->getListener()->waitAndGetLater([ack_callback, begin](bool f) {
auto end = chrono::system_clock::now();
if(ack_callback)
ack_callback(f);
//log_trace(category::connection, tr("Time needed for command: {}ms. Success: {}"), chrono::duration_cast<chrono::milliseconds>(end - begin).count(), f);
});
}
packet->enable_flag(PacketFlag::NewProtocol);
this->send_packet(packet);
}
void ProtocolHandler::send_packet(const std::shared_ptr<ts::protocol::ClientPacket> &packet) {
std::vector<pipes::buffer> result;
if(!this->create_datagram_packets(result, packet) || result.empty()) {
log_error(category::connection, tr("Failed to create datagram packets!"));
return;
}
{
if(packet->type() == protocol::PacketTypeInfo::Command && this->connection_state == connection_state::CONNECTED && false) {
ts::Command cmd{"whoami"};
auto data = cmd.build();
auto p1 = make_shared<ClientPacket>(PacketTypeInfo::Command, pipes::buffer_view{data.data(), data.size()});
if(!this->create_datagram_packets(result, p1))
log_error(category::connection, tr("failed to encode trap"));
std::reverse(result.begin(), result.end());
}
}
//log_trace(category::connection, tr("Split up {} {} to {} packets. Ack waiting: {}"), packet->packetId(), packet->type().name(), result.size(), this->acknowledge_handler.awaiting_acknowledge());
auto socket = this->handle->get_socket();
if(!socket) {
log_error(category::connection, tr("Failed to get socket!"));
return;
}
for(const auto& buffer : result)
socket->send_message(buffer);
}
void ProtocolHandler::send_acknowledge(uint16_t packet_id, bool low) {
char buffer[2];
le2be16(packet_id, buffer);
auto packet = make_shared<protocol::ClientPacket>(low ? protocol::PacketTypeInfo::AckLow : protocol::PacketTypeInfo::Ack, 0, pipes::buffer_view{buffer, 2});
if(this->connection_state >= connection_state::CONNECTING) {
;//packet->toggle(protocol::PacketFlag::NewProtocol, !low);
//LivingBots DDOS protection dont want a new protocol here!
}
this->send_packet(packet);
}
void ProtocolHandler::do_close_connection() {
this->connection_state = connection_state::DISCONNECTED;
for(auto& buffer : this->_packet_buffers) {
lock_guard lock(buffer.buffer_lock);
buffer.clear();
}
}
void ProtocolHandler::disconnect(const std::string &reason) {
if(this->connection_state >= connection_state::DISCONNECTING)
return;
this->connection_state = connection_state::DISCONNECTING;
this->disconnect_timestamp = system_clock::now();
auto did = ++this->disconnect_id;
Command cmd("clientdisconnect");
cmd["reasonmsg"] = reason;
this->send_command(cmd, [&, did](bool success){
/* if !success then we'll have prop already triggered the timeout and this here is obsolete */
if(success && this->connection_state == connection_state::DISCONNECTING && this->disconnect_id == did)
this->handle->close_connection();
});
}