Fixed some crashes

This commit is contained in:
WolverinDEV 2020-02-18 11:53:32 +01:00
parent e500f8c0aa
commit 72661d17bc
2 changed files with 77 additions and 91 deletions

View File

@ -1,5 +1,5 @@
#include "AcknowledgeManager.h" #include "AcknowledgeManager.h"
#include <math.h> #include <cmath>
#include <misc/endianness.h> #include <misc/endianness.h>
using namespace ts; using namespace ts;
@ -8,30 +8,27 @@ using namespace ts::protocol;
using namespace std; using namespace std;
using namespace std::chrono; using namespace std::chrono;
AcknowledgeManager::AcknowledgeManager() {} AcknowledgeManager::AcknowledgeManager() = default;
AcknowledgeManager::~AcknowledgeManager() { AcknowledgeManager::~AcknowledgeManager() {
{ this->reset();
lock_guard<recursive_mutex> lock(this->entry_lock);
for(const auto& entry : this->entries)
if(entry->acknowledge_listener)
entry->acknowledge_listener->executionFailed("deleted");
this->entries.clear();
}
} }
void AcknowledgeManager::reset() { void AcknowledgeManager::reset() {
{ {
lock_guard<recursive_mutex> lock(this->entry_lock); std::unique_lock lock{this->entry_lock};
for(const auto& entry : this->entries) auto pending_entries = std::move(this->entries);
lock.unlock();
/* save because entries are not accessable anymore */
for(const auto& entry : pending_entries)
if(entry->acknowledge_listener) if(entry->acknowledge_listener)
entry->acknowledge_listener->executionFailed("reset"); entry->acknowledge_listener->executionFailed("reset");
this->entries.clear();
} }
} }
size_t AcknowledgeManager::awaiting_acknowledge() { size_t AcknowledgeManager::awaiting_acknowledge() {
lock_guard<recursive_mutex> lock(this->entry_lock); std::lock_guard lock(this->entry_lock);
return this->entries.size(); return this->entries.size();
} }
@ -54,7 +51,7 @@ void AcknowledgeManager::process_packet(ts::protocol::BasicPacket &packet) {
entry->acknowledged = false; entry->acknowledged = false;
entry->send_count = 1; entry->send_count = 1;
{ {
lock_guard<recursive_mutex> lock(this->entry_lock); std::lock_guard lock(this->entry_lock);
this->entries.push_front(std::move(entry)); this->entries.push_front(std::move(entry));
} }
} }
@ -62,25 +59,18 @@ void AcknowledgeManager::process_packet(ts::protocol::BasicPacket &packet) {
bool AcknowledgeManager::process_acknowledge(uint8_t packet_type, const pipes::buffer_view& payload, std::string& error) { bool AcknowledgeManager::process_acknowledge(uint8_t packet_type, const pipes::buffer_view& payload, std::string& error) {
if(payload.length() < 2) return false; if(payload.length() < 2) return false;
PacketType target_type = PacketType::UNDEFINED; PacketType target_type{packet_type == protocol::ACK_LOW ? PacketType::COMMAND_LOW : PacketType::COMMAND};
uint16_t target_id = 0; uint16_t target_id{be2le16((char*) payload.data_ptr())};
if(packet_type == protocol::ACK_LOW) target_type = PacketType::COMMAND_LOW;
else if(packet_type == protocol::ACK) target_type = PacketType::COMMAND;
target_id = be2le16((char*) payload.data_ptr());
//debugMessage(0, "Got ack for {} {}", target_type, target_id);
if(target_type == PacketType::UNDEFINED) {
error = "Invalid packet type (" + to_string(target_type) + ")";
return false;
}
std::shared_ptr<Entry> entry; std::shared_ptr<Entry> entry;
std::unique_ptr<threads::Future<bool>> ack_listener;
{ {
lock_guard<recursive_mutex> lock(this->entry_lock); std::lock_guard lock{this->entry_lock};
for(auto it = this->entries.begin(); it != this->entries.end(); it++) { for(auto it = this->entries.begin(); it != this->entries.end(); it++) {
if((*it)->packet_type == target_type && (*it)->packet_id == target_id) { if((*it)->packet_type == target_type && (*it)->packet_id == target_id) {
entry = *it; entry = *it;
ack_listener = std::move(entry->acknowledge_listener); /* move it out so nobody else could call it as well */
entry->send_count--; entry->send_count--;
if(entry->send_count == 0) if(entry->send_count == 0)
this->entries.erase(it); this->entries.erase(it);
@ -100,46 +90,44 @@ bool AcknowledgeManager::process_acknowledge(uint8_t packet_type, const pipes::b
} }
entry->acknowledged = true; entry->acknowledged = true;
if(entry->acknowledge_listener) if(ack_listener) ack_listener->executionSucceed(true);
entry->acknowledge_listener->executionSucceed(true);
entry->acknowledge_listener.reset();
return true; return true;
} }
ssize_t AcknowledgeManager::execute_resend(const system_clock::time_point& now , std::chrono::system_clock::time_point &next_resend,std::deque<pipes::buffer>& buffers, string& error) { ssize_t AcknowledgeManager::execute_resend(const system_clock::time_point& now , std::chrono::system_clock::time_point &next_resend,std::deque<pipes::buffer>& buffers, string& error) {
ssize_t resend_count = 0; size_t resend_count{0};
deque<shared_ptr<Entry>> need_resend; vector<shared_ptr<Entry>> need_resend;
{ {
deque<shared_ptr<Entry>> erase; bool cleanup{false};
std::lock_guard lock{this->entry_lock};
need_resend.resize(this->entries.size());
lock_guard<recursive_mutex> lock(this->entry_lock);
for (auto &entry : this->entries) { for (auto &entry : this->entries) {
if(!entry->acknowledged && entry->next_resend <= now) {
entry->resend_period = entry->resend_period + milliseconds((int) ceil(this->average_response * 2));
if(entry->resend_period.count() > 1000)
entry->resend_period = milliseconds(1000);
else if(entry->resend_period.count() < 25)
entry->resend_period = milliseconds(25);
entry->next_resend = now + entry->resend_period;
need_resend.push_front(entry);
}
if(entry->acknowledged) { if(entry->acknowledged) {
if(entry->next_resend + entry->resend_period <= now) { //Timeout for may (more acknowledges) if(entry->next_resend + entry->resend_period <= now) { // Some resends are lost. So we just drop it after time
erase.push_back(entry); entry.reset();
cleanup = true;
} }
} else { } else {
if(entry->next_resend <= now) {
entry->resend_period = entry->resend_period + milliseconds{(int) ceil(this->average_response * 2)};
if(entry->resend_period.count() > 1000)
entry->resend_period = milliseconds(1000);
else if(entry->resend_period.count() < 25)
entry->resend_period = milliseconds(25);
entry->next_resend = now + entry->resend_period;
need_resend.push_back(entry);
}
if(next_resend > entry->next_resend) if(next_resend > entry->next_resend)
next_resend = entry->next_resend; next_resend = entry->next_resend;
} }
} }
for(const auto& e : erase) { if(cleanup) {
auto it = find(this->entries.begin(), this->entries.end(), e); this->entries.erase(std::remove_if(this->entries.begin(), this->entries.end(),
if(it != this->entries.end()) [](const auto& entry) { return !entry; }), this->entries.end());
this->entries.erase(it);
} }
} }

View File

@ -4,48 +4,46 @@
#include <protocol/Packet.h> #include <protocol/Packet.h>
#define DEBUG_ACKNOWLEDGE #define DEBUG_ACKNOWLEDGE
namespace ts { namespace ts::connection {
namespace connection { class VoiceClientConnection;
class VoiceClientConnection; class AcknowledgeManager {
class AcknowledgeManager { struct Entry {
struct Entry { uint16_t packet_id = 0;
uint16_t packet_id = 0; uint8_t packet_type = 0xFF;
uint8_t packet_type = 0xFF; uint8_t resend_count = 0;
uint8_t resend_count = 0; bool acknowledged : 1;
bool acknowledged : 1; uint8_t send_count : 7;
uint8_t send_count : 7;
pipes::buffer buffer; pipes::buffer buffer;
std::chrono::system_clock::time_point first_send; std::chrono::system_clock::time_point first_send;
std::chrono::system_clock::time_point next_resend; std::chrono::system_clock::time_point next_resend;
std::chrono::milliseconds resend_period; std::chrono::milliseconds resend_period;
std::unique_ptr<threads::Future<bool>> acknowledge_listener; std::unique_ptr<threads::Future<bool>> acknowledge_listener;
};
public:
AcknowledgeManager();
virtual ~AcknowledgeManager();
size_t awaiting_acknowledge();
void reset();
void process_packet(ts::protocol::BasicPacket& /* packet */);
bool process_acknowledge(uint8_t packet_type, const pipes::buffer_view& /* payload */, std::string& /* error */);
ssize_t execute_resend(
const std::chrono::system_clock::time_point& /* now */,
std::chrono::system_clock::time_point& /* next resend */,
std::deque<pipes::buffer>& /* buffers to resend */,
std::string& /* error */
);
private:
std::recursive_mutex entry_lock;
std::deque<std::shared_ptr<Entry>> entries;
std::chrono::milliseconds resend_delay{500};
double average_response = 20;
}; };
} public:
AcknowledgeManager();
virtual ~AcknowledgeManager();
size_t awaiting_acknowledge();
void reset();
void process_packet(ts::protocol::BasicPacket& /* packet */);
bool process_acknowledge(uint8_t packet_type, const pipes::buffer_view& /* payload */, std::string& /* error */);
ssize_t execute_resend(
const std::chrono::system_clock::time_point& /* now */,
std::chrono::system_clock::time_point& /* next resend */,
std::deque<pipes::buffer>& /* buffers to resend */,
std::string& /* error */
);
private:
std::mutex entry_lock;
std::deque<std::shared_ptr<Entry>> entries;
std::chrono::milliseconds resend_delay{500};
double average_response = 20;
};
} }