Added a recovery strategy for too long packets

This commit is contained in:
WolverinDEV 2020-04-04 01:09:33 +02:00
parent e1b2752fb6
commit 94c0208d45
3 changed files with 51 additions and 9 deletions

View File

@ -248,9 +248,6 @@ void ProtocolHandler::progress_packet(const pipes::buffer_view &buffer) {
}
}
if(packet->type() == PacketTypeInfo::Command || packet->type() == PacketTypeInfo::CommandLow)
this->send_acknowledge(packet->packetId(), packet->type() == PacketTypeInfo::CommandLow);
{
unique_lock queue_lock(read_queue.buffer_lock);
@ -258,10 +255,12 @@ void ProtocolHandler::progress_packet(const pipes::buffer_view &buffer) {
//log_trace(category::connection, tr("Inserting packet {} with id {}"), packet->type().name(), packet->packetId());
if(!read_queue.insert_index(packet_id, std::forward<shared_ptr<ServerPacket>>(packet))) {
log_warn(category::connection, tr("Failed to insert ordered packet into queue. ({} | {} | {})"), packet_type.name(), read_queue.current_index(), packet_id);
return;
}
} else {
if(!read_queue.push_back(std::forward<shared_ptr<ServerPacket>>(packet))) {
log_warn(category::connection, tr("Failed to insert unordered packet into queue. ({} | {} | {})"), packet_type.name(), read_queue.current_index(), packet_id);
return;
/* return; dont stop here because we've to progress the packets */
} else {
read_queue.index_set(packet_id); /* may we've skipped one packet id */
@ -269,6 +268,10 @@ void ProtocolHandler::progress_packet(const pipes::buffer_view &buffer) {
}
}
/* only send an ack when we actually succeeded registering the packet */
if(packet_type == PacketTypeInfo::Command || packet_type == PacketTypeInfo::CommandLow)
this->send_acknowledge(packet_id, packet_type == PacketTypeInfo::CommandLow);
while(this->handle_packets());
}
@ -325,11 +328,32 @@ bool ProtocolHandler::handle_packets() {
current_packet = buffer->slot_value(sequence_length++);
if(current_packet) {
if(this->_packet_buffer_overflow[current_packet->type().type()]) {
auto& overflow_flag = this->_packet_buffer_overflow[current_packet->type().type()];
while(current_packet && !current_packet->has_flag(PacketFlag::Fragmented) && sequence_length < buffer->capacity())
current_packet = buffer->slot_value(sequence_length++);
while(buffer->front_set())
if(buffer->pop_front() == current_packet)
break;
overflow_flag = !current_packet || !current_packet->has_flag(PacketFlag::Fragmented);
if(!overflow_flag) {
log_info(category::connection, tr("Recovered successfully from too long packet."));
}
return false;
}
if((current_packet->type() == PacketTypeInfo::Command || current_packet->type() == PacketTypeInfo::CommandLow) && current_packet->has_flag(PacketFlag::Fragmented)) {
do {
if(sequence_length >= buffer->capacity()) {
log_warn(category::connection, tr("Received fragmented packets which have a too long order. Dropping queue, which will cause a client drop."));
buffer->clear();
log_error(category::connection, tr("Received fragmented packets which have a too long order (> {}). Ignoring that command and dropping its segments."), buffer->capacity());
while(buffer->front_set())
buffer->pop_front();
//TODO: Log to the client!
//this->handle->execute_callback_disconnect.call(tr("received a too long packet"), true);
//this->disconnect("received a too long packet");
this->_packet_buffer_overflow[current_packet->type().type()] = true;
return false;
}
current_packet = buffer->slot_value(sequence_length++);

View File

@ -53,7 +53,7 @@ namespace tc {
};
class ProtocolHandler {
typedef ts::protocol::PacketRingBuffer<ts::protocol::ServerPacket, 86> packet_buffer_t;
typedef ts::protocol::PacketRingBuffer<ts::protocol::ServerPacket, 88> packet_buffer_t;
typedef std::array<packet_buffer_t, 8> packet_buffers_t;
friend class ServerConnection;
public:
@ -125,6 +125,8 @@ namespace tc {
uint16_t client_id = 0;
ts::protocol::PacketIdManager _packet_id_manager;
packet_buffers_t _packet_buffers;
std::array<bool, 9> _packet_buffer_overflow{false};
std::array<ts::protocol::generation_estimator, 9> incoming_generation_estimators{}; /* implementation is thread save */
uint8_t _packet_buffers_index = 0;

View File

@ -71,6 +71,7 @@ const do_connect = () => {
console.log("Connected with state: %o (%s) ", error, connection.error_message(error));
if(error == 0) {
connection.send_command("handshakebegin", [{intention: 0, authentication_method: 2, client_nickname: "Native client test" }], []);
console.dir(handle.ServerType);
console.log("Server type: %o|%o", connection.server_type, handle.ServerType[connection.server_type]);
connection.send_command("clientinit", [
@ -102,7 +103,7 @@ const do_connect = () => {
},
identity_key: "MG4DAgeAAgEgAiBC9JsqB1am6vowj2obomMyxm1GLk8qyRoxpBkAdiVYxwIgWksaSk7eyVQovZwPZBuiYHARz/xQD5zBUBK6e63V7hICIQCZ2glHe3kV62iIRKpkV2lzZGZtfBPRMbwIcU9aE1EVsg==",
teamspeak: true /* used here to speed up the handshake process :) */
teamspeak: false /* used here to speed up the handshake process :) */
});
connection.callback_voice_data = (buffer, client_id, codec_id, flag_head, packet_id) => {
@ -121,14 +122,29 @@ const do_connect = () => {
} else if(command === "notifyservergroupclientlist") {
console.log("Perm group");
return;
} else if(command === "notifypermissionlist") {
console.log("Received permission list");
return;
} else if(command == "notifycliententerview") {
console.log("Enter client: %o", arguments1);
return;
} else if(command === "error") {
console.log("Received error: %o", arguments1);
return;
}
console.log("Command: %s: %0", command, arguments1);
console.log("Command %s: %o", command, arguments1);
if(command === "channellistfinished") {
//115
//connection.send_command("clientgetvariables", [{ clid: 1 }], []);
//connection.send_command("channelsubscribeall", [], []);
connection.send_command("playlistsonglist", [{ playlist_id: '12' }], []);
/*
setInterval(() => {
connection.send_command("servergroupclientlist", [{ sgid: 2 }], []);
connection.send_command("servergrouppermlist", [{ sgid: 2 }], []);
}, 1000);
*/
}
};