Teaspeak-Server/file/local_server/LocalFileTransferClientWorker.cpp

207 lines
9.4 KiB
C++
Raw Normal View History

2020-05-07 15:28:15 -04:00
//
// Created by WolverinDEV on 04/05/2020.
//
#include <cassert>
#include <event2/event.h>
#include <log/LogUtils.h>
#include "./LocalFileProvider.h"
using namespace ts::server::file;
using namespace ts::server::file::transfer;
ClientWorkerStartResult LocalFileTransfer::start_client_worker() {
assert(!this->disconnect.active);
this->disconnect.active = true;
this->disconnect.dispatch_thread = std::thread(&LocalFileTransfer::dispatch_loop_client_worker, this);
return ClientWorkerStartResult::SUCCESS;
}
void LocalFileTransfer::shutdown_client_worker() {
if(!this->disconnect.active) return;
this->disconnect.active = false;
this->disconnect.notify_cv.notify_all();
if(this->disconnect.dispatch_thread.joinable())
this->disconnect.dispatch_thread.join();
{
std::unique_lock tlock{this->transfers_mutex};
if(!this->transfers_.empty())
logWarning(LOG_FT, "Shutting down disconnect worker even thou we still have some active clients. This could cause memory leaks.");
}
}
void LocalFileTransfer::disconnect_client(const std::shared_ptr<FileClient> &client, std::unique_lock<std::shared_mutex>& state_lock, bool flush) {
assert(state_lock.owns_lock());
if(client->state == FileClient::STATE_DISCONNECTED || (client->state == FileClient::STATE_DISCONNECTING && flush)) {
return; /* shall NOT happen */
}
#define del_ev_noblock(event) if(event) event_del_noblock(event)
client->state = flush ? FileClient::STATE_DISCONNECTING : FileClient::STATE_DISCONNECTED;
client->timings.disconnecting = std::chrono::system_clock::now();
if(flush) {
if(client->transfer->direction == Transfer::DIRECTION_UPLOAD) {
del_ev_noblock(client->networking.event_read);
del_ev_noblock(client->networking.event_write);
del_ev_noblock(client->networking.event_throttle);
/* no direct timeout needed here, we're just flushing the disk */
this->enqueue_disk_io(client);
} else if(client->transfer->direction == Transfer::DIRECTION_DOWNLOAD) {
del_ev_noblock(client->networking.event_read);
client->add_network_write_event_nolock(false);
/* max flush 10 seconds */
client->networking.disconnect_timeout = std::chrono::system_clock::now() + std::chrono::seconds{10};
}
} else {
del_ev_noblock(client->networking.event_read);
del_ev_noblock(client->networking.event_write);
del_ev_noblock(client->networking.event_throttle);
this->disconnect.notify_cv.notify_one();
}
#undef del_ev_noblock
}
void LocalFileTransfer::dispatch_loop_client_worker(void *ptr_transfer) {
auto provider = reinterpret_cast<LocalFileTransfer*>(ptr_transfer);
while(provider->disconnect.active) {
{
std::unique_lock dlock{provider->disconnect.mutex};
provider->disconnect.notify_cv.wait_for(dlock, std::chrono::seconds{1});
}
/* run the disconnect worker at least once before exiting */
/* transfer statistics */
{
std::unique_lock tlock{provider->transfers_mutex};
auto transfers = provider->transfers_;
tlock.unlock();
for(const auto& transfer : transfers) {
switch(transfer->state) {
case FileClient::STATE_TRANSFERRING:
break;
case FileClient::STATE_DISCONNECTING:
if(transfer->transfer && transfer->transfer->direction == Transfer::DIRECTION_DOWNLOAD)
break; /* we're still transferring (sending data) */
default:
continue;
}
provider->report_transfer_statistics(transfer->shared_from_this());
}
}
{
std::deque<std::shared_ptr<Transfer>> timeouted_transfers{};
{
std::unique_lock tlock{provider->transfers_mutex};
auto now = std::chrono::system_clock::now();
std::copy_if(provider->pending_transfers.begin(), provider->pending_transfers.end(), std::back_inserter(timeouted_transfers), [&](const std::shared_ptr<Transfer>& t) {
return t->initialized_timestamp + std::chrono::seconds{100} < now; //FIXME: Decrease to 10 again!
});
provider->pending_transfers.erase(std::remove_if(provider->pending_transfers.begin(), provider->pending_transfers.end(), [&](const auto& t) {
return std::find(timeouted_transfers.begin(), timeouted_transfers.end(), t) != timeouted_transfers.end();
}), provider->pending_transfers.end());
}
for(const auto& pt : timeouted_transfers) {
if(auto callback{provider->callback_transfer_aborted}; callback)
callback(pt, { TransferError::TRANSFER_TIMEOUT, "" });
}
if(!timeouted_transfers.empty())
logMessage(LOG_FT, "Removed {} pending transfers because no request has been made for them.", timeouted_transfers.size());
}
{
std::deque<std::shared_ptr<FileClient>> disconnected_clients{};
{
std::unique_lock tlock{provider->transfers_mutex};
auto now = std::chrono::system_clock::now();
std::copy_if(provider->transfers_.begin(), provider->transfers_.end(), std::back_inserter(disconnected_clients), [&](const std::shared_ptr<FileClient>& t) {
std::shared_lock slock{t->state_mutex};
if(t->state == FileClient::STATE_DISCONNECTED) {
return true;
} else if(t->state == FileClient::STATE_AWAITING_KEY) {
return t->timings.connected + std::chrono::seconds{10} < now;
} else if(t->state == FileClient::STATE_TRANSFERRING) {
assert(t->transfer);
if(t->transfer->direction == Transfer::DIRECTION_UPLOAD) {
return false; //FIXME: Due to debugging reasons
return t->timings.last_read + std::chrono::seconds{5} < now;
} else if(t->transfer->direction == Transfer::DIRECTION_DOWNLOAD) {
return t->timings.last_write + std::chrono::seconds{5} < now;
}
} else if(t->state == FileClient::STATE_DISCONNECTING) {
return t->timings.disconnecting + std::chrono::seconds{30} < now;
}
return false;
});
provider->transfers_.erase(std::remove_if(provider->transfers_.begin(), provider->transfers_.end(), [&](const auto& t) {
return std::find(disconnected_clients.begin(), disconnected_clients.end(), t) != disconnected_clients.end();
}), provider->transfers_.end());
}
for(auto& client : disconnected_clients) {
switch(client->state) {
case FileClient::STATE_AWAITING_KEY:
logMessage(LOG_FT, "{} Received no key. Dropping client.", client->log_prefix());
break;
case FileClient::STATE_TRANSFERRING:
logMessage(LOG_FT, "{} Networking timeout. Dropping client", client->log_prefix());
if(auto callback{provider->callback_transfer_aborted}; callback)
callback(client->transfer, { TransferError::TRANSFER_TIMEOUT, "" });
break;
case FileClient::STATE_DISCONNECTING:
logMessage(LOG_FT, "{} Failed to flush connection. Dropping client", client->log_prefix());
break;
default:
break;
}
{
std::unique_lock slock{client->state_mutex};
client->state = FileClient::STATE_DISCONNECTED;
provider->finalize_file_io(client, slock);
provider->finalize_networking(client, slock);
}
debugMessage(LOG_FT, "{} Destroying transfer.", client->log_prefix());
}
}
}
}
void LocalFileTransfer::report_transfer_statistics(const std::shared_ptr<FileClient> &client) {
auto callback{this->callback_transfer_statistics};
if(!callback) return;
TransferStatistics stats{};
stats.network_bytes_send = client->statistics.network_bytes_send;
stats.network_bytes_received = client->statistics.network_bytes_received;
stats.file_bytes_transferred = client->statistics.file_bytes_transferred;
stats.delta_network_bytes_received = stats.network_bytes_received - std::exchange(client->statistics.last_network_bytes_received, stats.network_bytes_received);
stats.delta_network_bytes_send = stats.network_bytes_send - std::exchange(client->statistics.last_network_bytes_send, stats.network_bytes_send);
stats.delta_file_bytes_transferred = stats.file_bytes_transferred - std::exchange(client->statistics.last_file_bytes_transferred, stats.file_bytes_transferred);
stats.file_start_offset = client->transfer->file_offset;
stats.file_current_offset = client->statistics.file_bytes_transferred + client->transfer->file_offset;
stats.file_total_size = client->transfer->expected_file_size;
callback(client->transfer, stats);
}