Fixed networking flush algorithm for the file transfer

This commit is contained in:
WolverinDEV 2020-06-11 13:08:45 +02:00
parent 6e1323cc23
commit cbb0bd6864
5 changed files with 136 additions and 129 deletions

View File

@ -178,11 +178,10 @@ namespace ts::server::file {
size_t bytes{0};
bool buffering_stopped{false};
bool write_disconnected{false};
Buffer* buffer_head{nullptr};
Buffer** buffer_tail{&buffer_head};
bool flushed{false};
} network_buffer{};
struct {
@ -190,11 +189,10 @@ namespace ts::server::file {
size_t bytes{0};
bool buffering_stopped{false};
bool write_disconnected{false};
Buffer* buffer_head{nullptr};
Buffer** buffer_tail{&buffer_head};
bool flushed{false};
} disk_buffer{};
struct {
@ -258,6 +256,10 @@ namespace ts::server::file {
bool enqueue_network_buffer_bytes(const void* /* buffer */, size_t /* length */);
bool enqueue_disk_buffer_bytes(const void* /* buffer */, size_t /* length */);
/* these function clear the buffers and set the write disconnected flags to true so no new buffers will be enqueued */
void flush_network_buffer();
void flush_disk_buffer();
[[nodiscard]] inline std::string log_prefix() const { return "[" + net::to_string(this->networking.address) + "]"; }
};
@ -462,7 +464,7 @@ namespace ts::server::file {
void shutdown_disk_io();
void shutdown_client_worker();
void disconnect_client(const std::shared_ptr<FileClient>& /* client */, std::unique_lock<std::shared_mutex>& /* state lock */, bool /* flush */);
void disconnect_client(const std::shared_ptr<FileClient>& /* client */, std::unique_lock<std::shared_mutex>& /* state lock */, bool /* flush network */);
[[nodiscard]] NetworkInitializeResult initialize_networking(const std::shared_ptr<FileClient>& /* client */, int /* file descriptor */);
/* might block 'till all IO operations have been succeeded */
@ -477,6 +479,8 @@ namespace ts::server::file {
void enqueue_disk_io(const std::shared_ptr<FileClient>& /* client */);
void execute_disk_io(const std::shared_ptr<FileClient>& /* client */);
void test_disconnecting_state(const std::shared_ptr<FileClient>& /* client */);
[[nodiscard]] TransferUploadRawResult handle_transfer_upload_raw(const std::shared_ptr<FileClient>& /* client */, const char * /* buffer */, size_t /* length */, size_t* /* bytes written */);
[[nodiscard]] TransferUploadHTTPResult handle_transfer_upload_http(const std::shared_ptr<FileClient>& /* client */, const char * /* buffer */, size_t /* length */);

View File

@ -28,22 +28,11 @@ void transfer::free_buffer(Buffer* buffer) {
}
FileClient::~FileClient() {
{
auto head = this->network_buffer.buffer_head;
while (head) {
auto next = head->next;
free_buffer(head);
head = next;
}
}
{
auto head = this->disk_buffer.buffer_head;
while (head) {
auto next = head->next;
free_buffer(head);
head = next;
}
}
this->flush_network_buffer();
this->flush_disk_buffer();
assert(!this->disk_buffer.buffer_head);
assert(!this->network_buffer.buffer_head);
assert(!this->file.file_descriptor);
assert(!this->file.currently_processing);

View File

@ -67,6 +67,26 @@ void LocalFileTransfer::disconnect_client(const std::shared_ptr<FileClient> &cli
#undef del_ev_noblock
}
void LocalFileTransfer::test_disconnecting_state(const std::shared_ptr<FileClient> &client) {
if(client->state != FileClient::STATE_DISCONNECTING)
return;
{
std::lock_guard db_lock{client->disk_buffer.mutex};
std::lock_guard nb_lock{client->network_buffer.mutex};
if(client->disk_buffer.bytes > 0)
return;
if(client->network_buffer.bytes > 0)
return;
}
debugMessage(LOG_FT, "{} Disk and network buffers are flushed.", client->log_prefix());
std::unique_lock s_lock{client->state_mutex};
this->disconnect_client(client, s_lock, false);
}
void LocalFileTransfer::dispatch_loop_client_worker(void *ptr_transfer) {
auto provider = reinterpret_cast<LocalFileTransfer*>(ptr_transfer);

View File

@ -107,13 +107,37 @@ bool FileClient::enqueue_disk_buffer_bytes(const void *snd_buffer, size_t size)
size_t buffer_size;
{
std::lock_guard block{this->disk_buffer.mutex};
if(this->disk_buffer.write_disconnected)
goto write_disconnected;
*this->disk_buffer.buffer_tail = tbuffer;
this->disk_buffer.buffer_tail = &tbuffer->next;
buffer_size = (this->disk_buffer.bytes += size);
}
return buffer_size > TRANSFER_MAX_CACHED_BYTES;
write_disconnected:
free_buffer(tbuffer);
return false;
}
void FileClient::flush_disk_buffer() {
Buffer* current_head;
{
std::lock_guard block{this->disk_buffer.mutex};
this->disk_buffer.write_disconnected = true;
this->disk_buffer.bytes = 0;
current_head = std::exchange(this->disk_buffer.buffer_head, nullptr);
this->disk_buffer.buffer_tail = &this->disk_buffer.buffer_head;
}
while(current_head) {
auto next = current_head->next;
free_buffer(current_head);
current_head = next;
}
}
FileInitializeResult LocalFileTransfer::initialize_file_io(const std::shared_ptr<FileClient> &transfer) {
@ -226,8 +250,6 @@ FileInitializeResult LocalFileTransfer::initialize_file_io(const std::shared_ptr
}
}
} else if(transfer->transfer->direction == Transfer::DIRECTION_DOWNLOAD) {
transfer->disk_buffer.flushed = true; /* we're not using this buffer, so it will be flushed all the times */
auto file_size = lseek(file_data.file_descriptor, 0, SEEK_END);
if(file_size != transfer->transfer->expected_file_size) {
logWarning(LOG_FT, "{} Expected target file to be of size {}, but file is actually of size {}", transfer->log_prefix(), transfer->transfer->expected_file_size, file_size);
@ -387,6 +409,8 @@ void LocalFileTransfer::execute_disk_io(const std::shared_ptr<FileClient> &clien
this->invoke_aborted_callback(client, { TransferError::UNEXPECTED_DISK_EOF, strerror(errno) });
client->flush_disk_buffer();
{
std::unique_lock slock{client->state_mutex};
client->handle->disconnect_client(client, slock, true);
@ -404,6 +428,8 @@ void LocalFileTransfer::execute_disk_io(const std::shared_ptr<FileClient> &clien
client->log_prefix(), offset_written, client->transfer->expected_file_size, aoffset);
this->invoke_aborted_callback(client, { TransferError::DISK_IO_ERROR, strerror(errno) });
client->flush_disk_buffer();
{
std::unique_lock slock{client->state_mutex};
client->handle->disconnect_client(client, slock, true);
@ -442,22 +468,7 @@ void LocalFileTransfer::execute_disk_io(const std::shared_ptr<FileClient> &clien
if(buffer_left_size > 0) {
this->enqueue_disk_io(client);
} else if(client->state == FileClient::STATE_DISCONNECTING) {
{
std::lock_guard nb_lock{client->network_buffer.mutex};
{
std::lock_guard db_lock{client->disk_buffer.mutex};
if(std::exchange(client->disk_buffer.flushed, true))
return;
}
if(!client->network_buffer.flushed) {
logTrace(LOG_FT, "{} Disk IO has been flushed, awaiting network buffer flush.", client->log_prefix());
return;
}
logTrace(LOG_FT, "{} Disk IO and network buffer have been flushed.", client->log_prefix());
}
std::unique_lock slock{client->state_mutex};
client->handle->disconnect_client(client->shared_from_this(), slock, false);
this->test_disconnecting_state(client);
}
if(client->state == FileClient::STATE_TRANSFERRING && buffer_left_size < TRANSFER_MAX_CACHED_BYTES / 2) {
@ -466,7 +477,10 @@ void LocalFileTransfer::execute_disk_io(const std::shared_ptr<FileClient> &clien
client->add_network_read_event(false);
}
} else if(client->transfer->direction == Transfer::DIRECTION_DOWNLOAD) {
if(client->state == FileClient::STATE_DISCONNECTING) return;
if(client->state == FileClient::STATE_DISCONNECTING) {
client->flush_disk_buffer(); /* just in case */
return;
}
while(true) {
constexpr auto buffer_capacity{4096};
@ -487,11 +501,6 @@ void LocalFileTransfer::execute_disk_io(const std::shared_ptr<FileClient> &clien
this->invoke_aborted_callback(client, { TransferError::UNEXPECTED_DISK_EOF, "" });
}
{
std::unique_lock slock{client->state_mutex};
client->handle->disconnect_client(client, slock, true);
}
} else {
if(errno == EAGAIN) {
this->enqueue_disk_io(client);
@ -501,11 +510,10 @@ void LocalFileTransfer::execute_disk_io(const std::shared_ptr<FileClient> &clien
logWarning(LOG_FT, "{} Failed to read from file {} ({}/{}). Aborting transfer.", client->log_prefix(), client->transfer->absolute_file_path, errno, strerror(errno));
this->invoke_aborted_callback(client, { TransferError::DISK_IO_ERROR, strerror(errno) });
{
std::unique_lock slock{client->state_mutex};
client->handle->disconnect_client(client, slock, true);
}
}
std::unique_lock slock{client->state_mutex};
client->handle->disconnect_client(client, slock, true);
return;
} else {
auto buffer_full = client->send_file_bytes(buffer, read);

View File

@ -12,6 +12,7 @@
#include "./LocalFileProvider.h"
#include "./duration_utils.h"
#include "HTTPUtils.h"
#include "LocalFileProvider.h"
#if defined(TCP_CORK) && !defined(TCP_NOPUSH)
#define TCP_NOPUSH TCP_CORK
@ -132,6 +133,8 @@ bool FileClient::enqueue_network_buffer_bytes(const void *snd_buffer, size_t siz
size_t buffer_size;
{
std::lock_guard block{this->network_buffer.mutex};
if(this->network_buffer.write_disconnected)
goto write_disconnected;
*this->network_buffer.buffer_tail = tbuffer;
this->network_buffer.buffer_tail = &tbuffer->next;
@ -140,6 +143,28 @@ bool FileClient::enqueue_network_buffer_bytes(const void *snd_buffer, size_t siz
this->add_network_write_event(false);
return buffer_size > TRANSFER_MAX_CACHED_BYTES;
write_disconnected:
free_buffer(tbuffer);
return false;
}
void FileClient::flush_network_buffer() {
Buffer* current_head;
{
std::lock_guard block{this->network_buffer.mutex};
this->network_buffer.write_disconnected = true;
this->network_buffer.bytes = 0;
current_head = std::exchange(this->network_buffer.buffer_head, nullptr);
this->network_buffer.buffer_tail = &this->network_buffer.buffer_head;
}
while(current_head) {
auto next = current_head->next;
free_buffer(current_head);
current_head = next;
}
}
NetworkingStartResult LocalFileTransfer::start_networking() {
@ -347,16 +372,18 @@ bool LocalFileTransfer::initialize_client_ssl(const std::shared_ptr<FileClient>
if(!ssl_option_supplier || !(options = ssl_option_supplier())) {
logError(0, "{} Failed to initialize client SSL pipe because we've no SSL options.", client->log_prefix());
client->flush_network_buffer(); /* invalidate all network write operations */
std::unique_lock slock{client->state_mutex};
client->handle->disconnect_client(client, slock, false);
client->handle->disconnect_client(client, slock, true);
return false;
}
if(!ssl_pipe.initialize(options, error)) {
logWarning(0, "{} Failed to initialize client SSL pipe ({}). Disconnecting client.", client->log_prefix(), error);
client->flush_network_buffer(); /* invalidate all network write operations */
std::unique_lock slock{client->state_mutex};
client->handle->disconnect_client(client, slock, false);
client->handle->disconnect_client(client, slock, true);
return false;
}
@ -470,7 +497,7 @@ void LocalFileTransfer::callback_transfer_network_read(int fd, short events, voi
if(read == 0) {
std::unique_lock slock{transfer->state_mutex};
auto original_state = transfer->state;
transfer->handle->disconnect_client(transfer->shared_from_this(), slock, false);
transfer->handle->disconnect_client(transfer->shared_from_this(), slock, true);
slock.unlock();
switch(original_state) {
@ -505,7 +532,7 @@ void LocalFileTransfer::callback_transfer_network_read(int fd, short events, voi
std::unique_lock slock{transfer->state_mutex};
auto original_state = transfer->state;
transfer->handle->disconnect_client(transfer->shared_from_this(), slock, false);
transfer->handle->disconnect_client(transfer->shared_from_this(), slock, true);
slock.unlock();
switch(original_state) {
@ -580,44 +607,20 @@ void LocalFileTransfer::callback_transfer_network_write(int fd, short events, vo
if((unsigned) events & (unsigned) EV_TIMEOUT) {
if(transfer->state == FileClient::STATE_DISCONNECTING) {
{
std::lock_guard nb_lock{transfer->network_buffer.mutex};
std::lock_guard db_lock{transfer->disk_buffer.mutex};
if(!std::exchange(transfer->network_buffer.flushed, true)) {
std::unique_lock nb_lock{transfer->network_buffer.mutex};
if(transfer->network_buffer.bytes > 0) {
nb_lock.unlock();
transfer->flush_network_buffer();
debugMessage(LOG_FT, "{} Failed to flush networking buffer in given timeout. Marking it as flushed.", transfer->log_prefix());
}
if(!transfer->disk_buffer.flushed) {
logTrace(LOG_FT, "{} Disk IO hasn't been fully flushed yet, awaiting disk IO flush.", transfer->log_prefix());
return;
}
logTrace(LOG_FT, "{} Disk IO and network buffer have been flushed.", transfer->log_prefix());
}
{
std::unique_lock slock{transfer->state_mutex};
transfer->handle->disconnect_client(transfer->shared_from_this(), slock, false);
}
transfer->handle->test_disconnecting_state(transfer->shared_from_this());
return;
}
}
if((unsigned) events & (unsigned) EV_WRITE) {
if(transfer->state != FileClient::STATE_DISCONNECTING && transfer->state != FileClient::STATE_TRANSFERRING) {
if(!(transfer->state == FileClient::STATE_AWAITING_KEY && transfer->networking.protocol == FileClient::PROTOCOL_HTTPS)) {
debugMessage(LOG_FT, "{} Tried to write data to send only stream. Dropping buffers.", transfer->log_prefix());
std::unique_lock block{transfer->network_buffer.mutex};
auto head = std::exchange(transfer->network_buffer.buffer_head, nullptr);
transfer->network_buffer.buffer_tail = &transfer->network_buffer.buffer_head;
while(head) {
auto next = head->next;
free_buffer(head);
head = next;
}
return;
}
}
Buffer* buffer{nullptr};
size_t buffer_left_size{0};
@ -637,37 +640,30 @@ void LocalFileTransfer::callback_transfer_network_write(int fd, short events, vo
assert(buffer->offset < buffer->length);
auto written = ::send(fd, buffer->data + buffer->offset, std::min(buffer->length - buffer->offset, max_write_bytes), MSG_DONTWAIT | MSG_NOSIGNAL);
if(written <= 0) {
if(transfer->state != FileClient::STATE_TRANSFERRING) {
std::unique_lock slock{transfer->state_mutex};
transfer->handle->disconnect_client(transfer->shared_from_this(), slock, false);
return;
if(errno == EAGAIN) {
transfer->add_network_write_event(false);
break;
}
if(written == 0) {
/* EOF, how the hell is this event possible?! (Read should already catch it) */
logError(LOG_FT, "{} Client disconnected unexpectedly on write. Send {} bytes out of {}.",
transfer->log_prefix(), transfer->statistics.file_transferred.total_bytes, transfer->transfer ? transfer->transfer->expected_file_size - transfer->transfer->file_offset : -1);
if(transfer->state == FileClient::STATE_TRANSFERRING) {
assert(transfer->transfer);
if(written == 0) {
/* EOF, how the hell is this event possible?! (Read should already catch it) */
logError(LOG_FT, "{} Client disconnected unexpectedly on write. Send {} bytes out of {}.",
transfer->log_prefix(), transfer->statistics.file_transferred.total_bytes, transfer->transfer ? transfer->transfer->expected_file_size - transfer->transfer->file_offset : -1);
transfer->handle->invoke_aborted_callback(transfer->shared_from_this(), { TransferError::UNEXPECTED_CLIENT_DISCONNECT, "" });
{
std::unique_lock slock{transfer->state_mutex};
transfer->handle->disconnect_client(transfer->shared_from_this(), slock, false);
}
} else {
if(errno == EAGAIN) {
transfer->add_network_write_event(false);
break;
}
transfer->handle->invoke_aborted_callback(transfer->shared_from_this(), { TransferError::UNEXPECTED_CLIENT_DISCONNECT, "" });
} else {
logError(LOG_FT, "{} Received network write error. Send {} bytes out of {}. Closing transfer.",
transfer->log_prefix(), transfer->statistics.file_transferred.total_bytes, transfer->transfer ? transfer->transfer->expected_file_size - transfer->transfer->file_offset : -1);
logError(LOG_FT, "{} Received network write error. Send {} bytes out of {}. Closing transfer.",
transfer->log_prefix(), transfer->statistics.file_transferred.total_bytes, transfer->transfer ? transfer->transfer->expected_file_size - transfer->transfer->file_offset : -1);
transfer->handle->invoke_aborted_callback(transfer->shared_from_this(), { TransferError::NETWORK_IO_ERROR, strerror(errno) });
{
std::unique_lock slock{transfer->state_mutex};
transfer->handle->disconnect_client(transfer->shared_from_this(), slock, false);
transfer->handle->invoke_aborted_callback(transfer->shared_from_this(), { TransferError::NETWORK_IO_ERROR, strerror(errno) });
}
}
transfer->flush_network_buffer(); /* invalidate all network write operations */
std::unique_lock slock{transfer->state_mutex};
transfer->handle->disconnect_client(transfer->shared_from_this(), slock, true);
return;
} else {
buffer->offset += written;
@ -699,22 +695,10 @@ void LocalFileTransfer::callback_transfer_network_write(int fd, short events, vo
}
}
if(buffer_left_size > 0)
if(buffer_left_size > 0) {
transfer->add_network_write_event(false);
else if(transfer->state == FileClient::STATE_DISCONNECTING) {
{
std::lock_guard nb_lock{transfer->network_buffer.mutex};
std::lock_guard db_lock{transfer->disk_buffer.mutex};
if(std::exchange(transfer->network_buffer.flushed, true))
return;
if(!transfer->disk_buffer.flushed) {
debugMessage(LOG_FT, "{} Disk IO hasn't been fully flushed yet, awaiting disk IO flush.", transfer->log_prefix());
return;
}
debugMessage(LOG_FT, "{} Disk IO and network buffer have been flushed.", transfer->log_prefix());
}
} else if(transfer->state == FileClient::STATE_DISCONNECTING) {
transfer->handle->test_disconnecting_state(transfer->shared_from_this());
if(!std::exchange(transfer->finished_signal_send, true)) {
if(transfer->transfer && transfer->statistics.file_transferred.total_bytes + transfer->transfer->file_offset == transfer->transfer->expected_file_size) {
@ -724,7 +708,9 @@ void LocalFileTransfer::callback_transfer_network_write(int fd, short events, vo
callback(transfer->transfer);
}
}
std::unique_lock slock{transfer->state_mutex};
/* no need to flush here, since we read only from the disk and all bytes which sould be send have been written already */
transfer->handle->disconnect_client(transfer->shared_from_this(), slock, false);
return;
}
@ -753,7 +739,7 @@ size_t LocalFileTransfer::handle_transfer_read_raw(const std::shared_ptr<FileCli
logWarning(LOG_FT, "{} Read bytes with unknown protocol. Closing connection.", client->log_prefix());
std::unique_lock slock{client->state_mutex};
client->handle->disconnect_client(client->shared_from_this(), slock, false);
client->handle->disconnect_client(client->shared_from_this(), slock, true);
return (size_t) -1;
}
@ -761,7 +747,7 @@ size_t LocalFileTransfer::handle_transfer_read_raw(const std::shared_ptr<FileCli
logWarning(LOG_FT, "{} Read bytes with unknown protocol but having not awaiting key state. Closing connection.", client->log_prefix());
std::unique_lock slock{client->state_mutex};
client->handle->disconnect_client(client->shared_from_this(), slock, false);
client->handle->disconnect_client(client->shared_from_this(), slock, true);
return (size_t) -1;
}
@ -831,7 +817,7 @@ size_t LocalFileTransfer::handle_transfer_read_raw(const std::shared_ptr<FileCli
std::unique_lock slock{client->state_mutex};
client->handle->disconnect_client(client->shared_from_this(), slock, false);
client->handle->disconnect_client(client->shared_from_this(), slock, true);
return (size_t) -1;
}
@ -871,9 +857,9 @@ size_t LocalFileTransfer::handle_transfer_read(const std::shared_ptr<FileClient>
if(!http::parse_request(std::string{header_view.data(), header_end}, request)) {
logError(LOG_FT, "{} Failed to parse HTTP request. Disconnecting client.", client->log_prefix());
std::unique_lock slock{client->state_mutex};
client->handle->disconnect_client(client->shared_from_this(), slock, true);
return (size_t) -1;
response.code = http::code::code(400, "Bad Request");
response.setHeader("x-error-message", { "failed to parse http request" });
goto send_response_exit;
}
if(auto header = request.findHeader("Sec-Fetch-Mode"); request.method == "OPTIONS") {