From 94c0208d45fe9429ea4ccb83f783e7a6b0f2d630 Mon Sep 17 00:00:00 2001 From: WolverinDEV Date: Sat, 4 Apr 2020 01:09:33 +0200 Subject: [PATCH] Added a recovery strategy for too long packets --- .../src/connection/ProtocolHandler.cpp | 34 ++++++++++++++++--- .../src/connection/ProtocolHandler.h | 4 ++- native/serverconnection/test/js/main.ts | 22 ++++++++++-- 3 files changed, 51 insertions(+), 9 deletions(-) diff --git a/native/serverconnection/src/connection/ProtocolHandler.cpp b/native/serverconnection/src/connection/ProtocolHandler.cpp index 9a48137..65aab72 100644 --- a/native/serverconnection/src/connection/ProtocolHandler.cpp +++ b/native/serverconnection/src/connection/ProtocolHandler.cpp @@ -248,9 +248,6 @@ void ProtocolHandler::progress_packet(const pipes::buffer_view &buffer) { } } - if(packet->type() == PacketTypeInfo::Command || packet->type() == PacketTypeInfo::CommandLow) - this->send_acknowledge(packet->packetId(), packet->type() == PacketTypeInfo::CommandLow); - { unique_lock queue_lock(read_queue.buffer_lock); @@ -258,10 +255,12 @@ void ProtocolHandler::progress_packet(const pipes::buffer_view &buffer) { //log_trace(category::connection, tr("Inserting packet {} with id {}"), packet->type().name(), packet->packetId()); if(!read_queue.insert_index(packet_id, std::forward>(packet))) { log_warn(category::connection, tr("Failed to insert ordered packet into queue. ({} | {} | {})"), packet_type.name(), read_queue.current_index(), packet_id); + return; } } else { if(!read_queue.push_back(std::forward>(packet))) { log_warn(category::connection, tr("Failed to insert unordered packet into queue. ({} | {} | {})"), packet_type.name(), read_queue.current_index(), packet_id); + return; /* return; dont stop here because we've to progress the packets */ } else { read_queue.index_set(packet_id); /* may we've skipped one packet id */ @@ -269,6 +268,10 @@ void ProtocolHandler::progress_packet(const pipes::buffer_view &buffer) { } } + /* only send an ack when we actually succeeded registering the packet */ + if(packet_type == PacketTypeInfo::Command || packet_type == PacketTypeInfo::CommandLow) + this->send_acknowledge(packet_id, packet_type == PacketTypeInfo::CommandLow); + while(this->handle_packets()); } @@ -325,11 +328,32 @@ bool ProtocolHandler::handle_packets() { current_packet = buffer->slot_value(sequence_length++); if(current_packet) { + if(this->_packet_buffer_overflow[current_packet->type().type()]) { + auto& overflow_flag = this->_packet_buffer_overflow[current_packet->type().type()]; + + while(current_packet && !current_packet->has_flag(PacketFlag::Fragmented) && sequence_length < buffer->capacity()) + current_packet = buffer->slot_value(sequence_length++); + + while(buffer->front_set()) + if(buffer->pop_front() == current_packet) + break; + + overflow_flag = !current_packet || !current_packet->has_flag(PacketFlag::Fragmented); + if(!overflow_flag) { + log_info(category::connection, tr("Recovered successfully from too long packet.")); + } + return false; + } if((current_packet->type() == PacketTypeInfo::Command || current_packet->type() == PacketTypeInfo::CommandLow) && current_packet->has_flag(PacketFlag::Fragmented)) { do { if(sequence_length >= buffer->capacity()) { - log_warn(category::connection, tr("Received fragmented packets which have a too long order. Dropping queue, which will cause a client drop.")); - buffer->clear(); + log_error(category::connection, tr("Received fragmented packets which have a too long order (> {}). Ignoring that command and dropping its segments."), buffer->capacity()); + while(buffer->front_set()) + buffer->pop_front(); + //TODO: Log to the client! + //this->handle->execute_callback_disconnect.call(tr("received a too long packet"), true); + //this->disconnect("received a too long packet"); + this->_packet_buffer_overflow[current_packet->type().type()] = true; return false; } current_packet = buffer->slot_value(sequence_length++); diff --git a/native/serverconnection/src/connection/ProtocolHandler.h b/native/serverconnection/src/connection/ProtocolHandler.h index f990eba..fbc6557 100644 --- a/native/serverconnection/src/connection/ProtocolHandler.h +++ b/native/serverconnection/src/connection/ProtocolHandler.h @@ -53,7 +53,7 @@ namespace tc { }; class ProtocolHandler { - typedef ts::protocol::PacketRingBuffer packet_buffer_t; + typedef ts::protocol::PacketRingBuffer packet_buffer_t; typedef std::array packet_buffers_t; friend class ServerConnection; public: @@ -125,6 +125,8 @@ namespace tc { uint16_t client_id = 0; ts::protocol::PacketIdManager _packet_id_manager; packet_buffers_t _packet_buffers; + std::array _packet_buffer_overflow{false}; + std::array incoming_generation_estimators{}; /* implementation is thread save */ uint8_t _packet_buffers_index = 0; diff --git a/native/serverconnection/test/js/main.ts b/native/serverconnection/test/js/main.ts index f3715ad..b8e935c 100644 --- a/native/serverconnection/test/js/main.ts +++ b/native/serverconnection/test/js/main.ts @@ -71,6 +71,7 @@ const do_connect = () => { console.log("Connected with state: %o (%s) ", error, connection.error_message(error)); if(error == 0) { + connection.send_command("handshakebegin", [{intention: 0, authentication_method: 2, client_nickname: "Native client test" }], []); console.dir(handle.ServerType); console.log("Server type: %o|%o", connection.server_type, handle.ServerType[connection.server_type]); connection.send_command("clientinit", [ @@ -95,14 +96,14 @@ const do_connect = () => { "client_nickname_phonetic":"", "client_default_token":"", "hwid":"123,456123123123", - return_code:91 + return_code: 91 } ], []); } }, identity_key: "MG4DAgeAAgEgAiBC9JsqB1am6vowj2obomMyxm1GLk8qyRoxpBkAdiVYxwIgWksaSk7eyVQovZwPZBuiYHARz/xQD5zBUBK6e63V7hICIQCZ2glHe3kV62iIRKpkV2lzZGZtfBPRMbwIcU9aE1EVsg==", - teamspeak: true /* used here to speed up the handshake process :) */ + teamspeak: false /* used here to speed up the handshake process :) */ }); connection.callback_voice_data = (buffer, client_id, codec_id, flag_head, packet_id) => { @@ -121,14 +122,29 @@ const do_connect = () => { } else if(command === "notifyservergroupclientlist") { console.log("Perm group"); return; + } else if(command === "notifypermissionlist") { + console.log("Received permission list"); + return; + } else if(command == "notifycliententerview") { + console.log("Enter client: %o", arguments1); + return; + } else if(command === "error") { + console.log("Received error: %o", arguments1); + return; } - console.log("Command: %s: %0", command, arguments1); + console.log("Command %s: %o", command, arguments1); if(command === "channellistfinished") { + //115 + //connection.send_command("clientgetvariables", [{ clid: 1 }], []); + //connection.send_command("channelsubscribeall", [], []); + connection.send_command("playlistsonglist", [{ playlist_id: '12' }], []); + /* setInterval(() => { connection.send_command("servergroupclientlist", [{ sgid: 2 }], []); connection.send_command("servergrouppermlist", [{ sgid: 2 }], []); }, 1000); + */ } };