SOme updates

This commit is contained in:
WolverinDEV 2021-03-21 23:51:36 +01:00
parent eef0144e77
commit 8dde5b1c23
4 changed files with 34 additions and 33 deletions

View File

@ -279,25 +279,33 @@ namespace std {
};
*/
/**
* Test if a `std::shared_mutex` is unique locked by trying to lock it in shared mode.
* @tparam T
* @param mutex
* @return
*/
template <typename T>
inline bool mutex_locked(T& mutex) {
/* std::shared_mutex can be recursively unique locked?? */
return true;
try {
unique_lock<T> lock_try(mutex, try_to_lock); /* should throw EDEADLK */
std::unique_lock<T> lock_try(mutex, try_to_lock); /* should throw EDEADLK */
return false;
} catch(const std::system_error& ex) {
return ex.code() == errc::resource_deadlock_would_occur;
}
}
/**
* Test if a `std::shared_mutex` is shared (or unique) locked by try locking it in unique mode
* @tparam T
* @param mutex
* @return
*/
template <typename T>
inline bool mutex_shared_locked(T& mutex) {
return true;
try {
shared_lock<T> lock_try(mutex, try_to_lock); /* should throw EDEADLK */
return false;
} catch(const std::system_error& ex) {
return ex.code() == errc::resource_deadlock_would_occur;
}
//return mutex_locked(mutex);
}
}

View File

@ -95,46 +95,41 @@ bool AcknowledgeManager::process_acknowledge(uint8_t packet_type, uint16_t targe
}
void AcknowledgeManager::execute_resend(const system_clock::time_point& now , std::chrono::system_clock::time_point &next_resend,std::deque<std::shared_ptr<Entry>>& buffers) {
vector<shared_ptr<Entry>> resend_failed;
{
bool cleanup{false};
std::lock_guard lock{this->entry_lock};
resend_failed.reserve(this->entries.size());
std::deque<std::shared_ptr<Entry>> resend_failed;
for (auto &entry : this->entries) {
if (entry->acknowledged) {
{
std::lock_guard lock{this->entry_lock};
this->entries.erase(std::remove_if(this->entries.begin(), this->entries.end(), [&](std::shared_ptr<Entry>& entry) {
if(entry->acknowledged) {
if (entry->next_resend + std::chrono::milliseconds{(int64_t) ceil(this->rto * 4)} <= now) {
/* Some resends are lost. So we just drop it after time */
entry.reset();
cleanup = true;
return true;
}
} else {
if (entry->next_resend <= now) {
if (entry->resend_count > 15 && entry->first_send + seconds(15) < now) {
resend_failed.push_back(std::move(entry)); /* transfer the ownership */
cleanup = true;
continue;
/* packet resend seems to have failed */
resend_failed.push_back(std::move(entry));
return true;
} else {
entry->next_resend =
now + std::chrono::milliseconds{(int64_t) std::min(ceil(this->rto), 1500.f)};
entry->next_resend = now + std::chrono::milliseconds{(int64_t) std::min(ceil(this->rto), 1500.f)};
buffers.push_back(entry);
//entry->resend_count++; /* this MUST be incremented by the result handler (resend may fails) */
entry->send_count++;
}
}
if (next_resend > entry->next_resend)
if (next_resend > entry->next_resend) {
next_resend = entry->next_resend;
}
}
}
if (cleanup) {
this->entries.erase(std::remove_if(this->entries.begin(), this->entries.end(),
[](const auto &entry) { return !entry; }), this->entries.end());
}
return false;
}), this->entries.end());
}
for(const auto& failed : resend_failed)
for(const auto& failed : resend_failed) {
this->callback_resend_failed(this->callback_data, failed);
}
}
/* we're not taking the clock granularity into account because its nearly 1ms and it would only add more branches */

View File

@ -8,8 +8,6 @@
#define DEBUG_ACKNOWLEDGE
namespace ts::connection {
class VoiceClientConnection;
class AcknowledgeManager {
public:
struct Entry {
@ -32,7 +30,7 @@ namespace ts::connection {
AcknowledgeManager();
virtual ~AcknowledgeManager();
size_t awaiting_acknowledge();
[[nodiscard]] size_t awaiting_acknowledge();
void reset();
void process_packet(uint8_t /* packet type */, uint32_t /* full packet id */, void* /* packet ptr */, std::unique_ptr<std::function<void(bool)>> /* ack listener */);

View File

@ -54,7 +54,7 @@ namespace ts::protocol {
public:
/* direct function calls are better optimized out */
typedef void(*callback_decoded_packet_t)(void* /* cb argument */, const protocol::PacketParser&);
typedef void(*callback_decoded_command_t)(void* /* cb argument */, ReassembledCommand*& /* command */); /* must move the command, else it gets freed*/
typedef void(*callback_decoded_command_t)(void* /* cb argument */, ReassembledCommand*& /* command */); /* must move the command, else it gets freed */
typedef void(*callback_send_acknowledge_t)(void* /* cb argument */, uint16_t /* packet id */, bool /* is command low */);
explicit PacketDecoder(connection::CryptHandler* /* crypt handler */, bool /* is server */);