Reschedule packet send on `EAGAIN`

This commit is contained in:
WolverinDEV 2021-02-01 20:59:03 +01:00
parent e2335becd7
commit 9af0a76ed6
4 changed files with 45 additions and 27 deletions

View File

@ -151,8 +151,9 @@ void FileServerHandler::callback_transfer_statistics(const std::shared_ptr<trans
client->getConnectionStatistics()->logFileTransferOut(statistics.delta_file_bytes_transferred);
}
if(client->getType() == ClientType::CLIENT_TEAMSPEAK)
if(client->getType() == ClientType::CLIENT_TEAMSPEAK) {
return; /* TS3 does not know this notify */
}
ts::command_builder notify{"notifyfiletransferprogress"};
notify.put_unchecked(0, "clientftfid", transfer->client_transfer_id);
@ -176,7 +177,9 @@ void FileServerHandler::callback_transfer_started(const std::shared_ptr<transfer
if(!server) return; /* well that's bad */
auto client = server->find_client_by_id(transfer->client_id);
if(!client || client->getUid() != transfer->client_unique_id) return;
if(!client || client->getUid() != transfer->client_unique_id) {
return;
}
ts::command_builder notify{"notifyfiletransferstarted"};
@ -186,14 +189,19 @@ void FileServerHandler::callback_transfer_started(const std::shared_ptr<transfer
void FileServerHandler::callback_transfer_finished(const std::shared_ptr<transfer::Transfer> &transfer) {
auto server = this->instance_->getVoiceServerManager()->findServerById(transfer->server->server_id());
if(!server) return; /* well that's bad */
if(!server) {
return; /* well that's bad */
}
auto client = server->find_client_by_id(transfer->client_id);
if(!client || client->getUid() != transfer->client_unique_id) return;
if(client->getType() == ClientType::CLIENT_TEAMSPEAK)
if(!client || client->getUid() != transfer->client_unique_id) {
return;
}
if(client->getType() == ClientType::CLIENT_TEAMSPEAK) {
return;
}
ts::command_builder notify{"notifystatusfiletransfer"};

View File

@ -35,10 +35,10 @@ void PacketEncoder::reset() {
{
std::lock_guard wlock{this->write_queue_mutex};
whead = std::exchange(this->encrypt_queue_head, nullptr);
rhead = std::exchange(this->resend_queue_head, nullptr);
rhead = std::exchange(this->send_queue_head, nullptr);
this->encrypt_queue_tail = &this->encrypt_queue_head;
this->send_queue_tail = &this->resend_queue_head;
this->send_queue_tail = &this->send_queue_head;
}
while(whead) {
@ -238,7 +238,7 @@ void PacketEncoder::encrypt_pending_packets() {
auto packet = packets_head;
auto packet_tail = packet;
while(packet) {
this->prepare_outgoing_packet(packet);
this->encrypt_outgoing_packet(packet);
packet = packet->next;
if(packet) {
@ -253,7 +253,7 @@ void PacketEncoder::encrypt_pending_packets() {
}
}
bool PacketEncoder::prepare_outgoing_packet(ts::protocol::OutgoingServerPacket *packet) {
bool PacketEncoder::encrypt_outgoing_packet(ts::protocol::OutgoingServerPacket *packet) {
if(packet->type_and_flags & protocol::PacketFlag::Unencrypted) {
this->crypt_handler_->write_default_mac(packet->mac);
} else {
@ -284,18 +284,18 @@ bool PacketEncoder::prepare_outgoing_packet(ts::protocol::OutgoingServerPacket *
}
PacketEncoder::BufferPopResult PacketEncoder::pop_write_buffer(protocol::OutgoingServerPacket *&result) {
bool need_prepare_packet{false}, more_packets;
bool need_encrypt{false}, more_packets;
{
std::lock_guard wlock{this->write_queue_mutex};
if(this->resend_queue_head) {
result = this->resend_queue_head;
if(this->send_queue_head) {
result = this->send_queue_head;
if(result->next) {
assert(this->send_queue_tail != &result->next);
this->resend_queue_head = result->next;
this->send_queue_head = result->next;
} else {
assert(this->send_queue_tail == &result->next);
this->resend_queue_head = nullptr;
this->send_queue_tail = &this->resend_queue_head;
this->send_queue_head = nullptr;
this->send_queue_tail = &this->send_queue_head;
}
} else if(this->encrypt_queue_head) {
result = this->encrypt_queue_head;
@ -307,21 +307,30 @@ PacketEncoder::BufferPopResult PacketEncoder::pop_write_buffer(protocol::Outgoin
this->encrypt_queue_head = nullptr;
this->encrypt_queue_tail = &this->encrypt_queue_head;
}
need_prepare_packet = true;
need_encrypt = true;
} else {
return BufferPopResult::DRAINED;
}
result->next = nullptr;
more_packets = this->resend_queue_head != nullptr || this->encrypt_queue_head != nullptr;
more_packets = this->send_queue_head != nullptr || this->encrypt_queue_head != nullptr;
}
if(need_prepare_packet) {
this->prepare_outgoing_packet(result);
if(need_encrypt) {
this->encrypt_outgoing_packet(result);
}
return more_packets ? BufferPopResult::MORE_AVAILABLE : BufferPopResult::DRAINED;
}
void PacketEncoder::reenqueue_failed_buffer(protocol::OutgoingServerPacket *packet) {
std::lock_guard wlock{this->write_queue_mutex};
if(!this->send_queue_head) {
this->send_queue_tail = &packet->next;
}
packet->next = this->send_queue_head;
this->send_queue_head = packet;
}
void PacketEncoder::execute_resend(const std::chrono::system_clock::time_point &now, std::chrono::system_clock::time_point &next) {
std::deque<std::shared_ptr<connection::AcknowledgeManager::Entry>> buffers{};
std::string error{};
@ -367,7 +376,7 @@ bool PacketEncoder::wait_empty_write_and_prepare_queue(std::chrono::time_point<s
if(this->encrypt_queue_head)
goto _wait;
if(this->resend_queue_head)
if(this->send_queue_head)
goto _wait;
}
break;

View File

@ -55,6 +55,7 @@ namespace ts::server::server::udp {
/* if the result is true, ownership has been transferred */
BufferPopResult pop_write_buffer(protocol::OutgoingServerPacket*& /* packet */);
void reenqueue_failed_buffer(protocol::OutgoingServerPacket* /* packet */);
[[nodiscard]] inline auto& acknowledge_manager() { return this->acknowledge_manager_; }
@ -74,8 +75,8 @@ namespace ts::server::server::udp {
connection::AcknowledgeManager acknowledge_manager_{};
spin_mutex write_queue_mutex{};
protocol::OutgoingServerPacket* resend_queue_head{nullptr};
protocol::OutgoingServerPacket** send_queue_tail{&resend_queue_head};
protocol::OutgoingServerPacket* send_queue_head{nullptr};
protocol::OutgoingServerPacket** send_queue_tail{&send_queue_head};
protocol::OutgoingServerPacket* encrypt_queue_head{nullptr};
protocol::OutgoingServerPacket** encrypt_queue_tail{&encrypt_queue_head};
@ -85,6 +86,6 @@ namespace ts::server::server::udp {
/* thread save function */
bool prepare_outgoing_packet(protocol::OutgoingServerPacket* /* packet */);
bool encrypt_outgoing_packet(protocol::OutgoingServerPacket* /* packet */);
};
}

View File

@ -438,8 +438,8 @@ void VoiceServer::handleMessageWrite(int fd, short events, void *_event_handle)
ssize_t res = write_datagram(io, client->remote_address, &client->connection->remote_address_info_, packet->packet_length(), packet->packet_data());
if(res != packet->packet_length()) {
if(errno == EAGAIN) {
logCritical(voice_server->server->getServerId(), "Failed to write datagram packet for client {} (EAGAIN).", client->getLoggingPeerIp() + ":" + to_string(client->getPeerPort()));
packet->unref();
client->connection->packet_encoder().reenqueue_failed_buffer(packet);
logTrace(voice_server->server->getServerId(), "Failed to write datagram packet for client {} (EAGAIN). Rescheduling packet.", client->getLoggingPeerIp() + ":" + to_string(client->getPeerPort()));
return;
} else if(errno == EINVAL || res == -0xFEB) {
/* needs more debug */