473 lines
20 KiB
C++
473 lines
20 KiB
C++
//
|
|
// Created by WolverinDEV on 04/05/2020.
|
|
//
|
|
|
|
#include <cassert>
|
|
#include <event.h>
|
|
#include <experimental/filesystem>
|
|
#include <log/LogUtils.h>
|
|
#include "./LocalFileProvider.h"
|
|
#include "./duration_utils.h"
|
|
#include "LocalFileProvider.h"
|
|
|
|
using namespace ts::server::file;
|
|
using namespace ts::server::file::transfer;
|
|
namespace fs = std::experimental::filesystem;
|
|
|
|
DiskIOStartResult LocalFileTransfer::start_disk_io() {
|
|
assert(this->disk_io.state == DiskIOLoopState::STOPPED);
|
|
|
|
this->disk_io.state = DiskIOLoopState::RUNNING;
|
|
this->disk_io.dispatch_thread = std::thread(&LocalFileTransfer::dispatch_loop_disk_io, this);
|
|
return DiskIOStartResult::SUCCESS;
|
|
}
|
|
|
|
void LocalFileTransfer::shutdown_disk_io() {
|
|
if(this->disk_io.state == DiskIOLoopState::STOPPED) return;
|
|
|
|
this->disk_io.state = DiskIOLoopState::STOPPING;
|
|
{
|
|
std::unique_lock qlock{this->disk_io.queue_lock};
|
|
this->disk_io.notify_work_awaiting.notify_all();
|
|
while(this->disk_io.queue_head)
|
|
this->disk_io.notify_client_processed.wait_for(qlock, std::chrono::seconds{10});
|
|
|
|
if(this->disk_io.queue_head) {
|
|
logWarning(0, "Failed to flush disk IO. Force aborting.");
|
|
this->disk_io.state = DiskIOLoopState::FORCE_STOPPING;
|
|
this->disk_io.notify_work_awaiting.notify_all();
|
|
this->disk_io.notify_client_processed.wait(qlock);
|
|
}
|
|
}
|
|
|
|
if(this->disk_io.dispatch_thread.joinable())
|
|
this->disk_io.dispatch_thread.join();
|
|
|
|
this->disk_io.state = DiskIOLoopState::STOPPED;
|
|
}
|
|
|
|
void LocalFileTransfer::dispatch_loop_disk_io(void *provider_ptr) {
|
|
auto provider = reinterpret_cast<LocalFileTransfer*>(provider_ptr);
|
|
|
|
std::shared_ptr<FileClient> client{};
|
|
while(true) {
|
|
{
|
|
std::unique_lock qlock{provider->disk_io.queue_lock};
|
|
if(client) {
|
|
client->file.currently_processing = false;
|
|
provider->disk_io.notify_client_processed.notify_all();
|
|
client = nullptr;
|
|
}
|
|
|
|
provider->disk_io.notify_work_awaiting.wait(qlock, [&]{ return provider->disk_io.state != DiskIOLoopState::RUNNING || provider->disk_io.queue_head != nullptr; });
|
|
|
|
if(provider->disk_io.queue_head) {
|
|
client = provider->disk_io.queue_head->shared_from_this();
|
|
|
|
provider->disk_io.queue_head = provider->disk_io.queue_head->file.next_client;
|
|
if(!provider->disk_io.queue_head)
|
|
provider->disk_io.queue_tail = &provider->disk_io.queue_head;
|
|
}
|
|
|
|
if(provider->disk_io.state != DiskIOLoopState::RUNNING) {
|
|
if(provider->disk_io.state == DiskIOLoopState::STOPPING) {
|
|
if(!client)
|
|
break;
|
|
/* break only if all clients have been flushed */
|
|
} else {
|
|
/* force stopping without any flushing */
|
|
auto fclient = &*client;
|
|
while(fclient)
|
|
fclient = std::exchange(fclient->file.next_client, nullptr);
|
|
|
|
provider->disk_io.queue_head = nullptr;
|
|
provider->disk_io.queue_tail = &provider->disk_io.queue_head;
|
|
break;
|
|
}
|
|
}
|
|
|
|
if(!client)
|
|
continue;
|
|
|
|
client->file.currently_processing = true;
|
|
client->file.next_client = nullptr;
|
|
}
|
|
|
|
provider->execute_disk_io(client);
|
|
}
|
|
provider->disk_io.notify_client_processed.notify_all();
|
|
}
|
|
|
|
FileInitializeResult LocalFileTransfer::initialize_file_io(const std::shared_ptr<FileClient> &transfer) {
|
|
FileInitializeResult result{FileInitializeResult::SUCCESS};
|
|
assert(transfer->transfer);
|
|
|
|
std::shared_lock slock{transfer->state_mutex};
|
|
auto& file_data = transfer->file;
|
|
assert(!file_data.file_descriptor);
|
|
assert(!file_data.next_client);
|
|
|
|
{
|
|
unsigned int open_flags{0};
|
|
if(transfer->transfer->direction == Transfer::DIRECTION_DOWNLOAD) {
|
|
open_flags = O_RDONLY;
|
|
|
|
std::error_code fs_error{};
|
|
if(file_data.absolute_path.empty() || !fs::exists(file_data.absolute_path, fs_error)) {
|
|
result = FileInitializeResult::FILE_DOES_NOT_EXISTS;
|
|
goto error_exit;
|
|
} else if(fs_error) {
|
|
logWarning(LOG_FT, "{} Failed to check for file existence of {}: {}/{}", transfer->log_prefix(), file_data.absolute_path, fs_error.value(), fs_error.message());
|
|
result = FileInitializeResult::FILE_SYSTEM_ERROR;
|
|
goto error_exit;
|
|
}
|
|
} else if(transfer->transfer->direction == Transfer::DIRECTION_UPLOAD) {
|
|
open_flags = (unsigned) O_WRONLY | (unsigned) O_CREAT;
|
|
if(transfer->transfer->override_exiting)
|
|
open_flags |= (unsigned) O_TRUNC;
|
|
} else {
|
|
return FileInitializeResult::INVALID_TRANSFER_DIRECTION;
|
|
}
|
|
|
|
file_data.file_descriptor = open(file_data.absolute_path.c_str(), (int) open_flags, 0644);
|
|
if(file_data.file_descriptor <= 0) {
|
|
const auto errno_ = errno;
|
|
switch (errno_) {
|
|
case EACCES:
|
|
result = FileInitializeResult::FILE_IS_NOT_ACCESSIBLE;
|
|
break;
|
|
case EDQUOT:
|
|
logWarning(LOG_FT, "{} Disk inode limit has been reached. Failed to start file transfer for file {}", transfer->log_prefix(), file_data.absolute_path);
|
|
result = FileInitializeResult::FILE_SYSTEM_ERROR;
|
|
break;
|
|
case EISDIR:
|
|
result = FileInitializeResult::FILE_IS_A_DIRECTORY;
|
|
break;
|
|
case EMFILE:
|
|
result = FileInitializeResult::PROCESS_FILE_LIMIT_REACHED;
|
|
break;
|
|
case ENFILE:
|
|
result = FileInitializeResult::SYSTEM_FILE_LIMIT_REACHED;
|
|
break;
|
|
case ETXTBSY:
|
|
result = FileInitializeResult::FILE_IS_BUSY;
|
|
break;
|
|
case EROFS:
|
|
result = FileInitializeResult::DISK_IS_READ_ONLY;
|
|
break;
|
|
default:
|
|
logWarning(LOG_FT, "{} Failed to start file transfer for file {}: {}/{}", transfer->log_prefix(), file_data.absolute_path, errno_, strerror(errno_));
|
|
result = FileInitializeResult::FILE_SYSTEM_ERROR;
|
|
break;
|
|
}
|
|
goto error_exit;
|
|
}
|
|
}
|
|
|
|
this->file_system_.lock_file(transfer->file.absolute_path);
|
|
transfer->file.file_locked = true;
|
|
|
|
if(transfer->transfer->direction == Transfer::DIRECTION_UPLOAD) {
|
|
if(ftruncate(file_data.file_descriptor, transfer->transfer->expected_file_size) != 0) {
|
|
const auto errno_ = errno;
|
|
switch (errno_) {
|
|
case EACCES:
|
|
logWarning(LOG_FT, "{} File {} got inaccessible on truncating, but not on opening.", transfer->log_prefix(), file_data.absolute_path);
|
|
result = FileInitializeResult::FILE_IS_NOT_ACCESSIBLE;
|
|
goto error_exit;
|
|
|
|
case EFBIG:
|
|
result = FileInitializeResult::FILE_TOO_LARGE;
|
|
goto error_exit;
|
|
|
|
case EIO:
|
|
logWarning(LOG_FT, "{} A disk IO error occurred while resizing file {}.", transfer->log_prefix(), file_data.absolute_path);
|
|
result = FileInitializeResult::FILE_IS_NOT_ACCESSIBLE;
|
|
goto error_exit;
|
|
|
|
case EROFS:
|
|
logWarning(LOG_FT, "{} Failed to resize file {} because disk is in read only mode.", transfer->log_prefix(), file_data.absolute_path);
|
|
result = FileInitializeResult::FILE_IS_NOT_ACCESSIBLE;
|
|
goto error_exit;
|
|
default:
|
|
debugMessage(LOG_FT, "{} Failed to truncate file {}: {}/{}. Trying to upload file anyways.", transfer->log_prefix(), file_data.absolute_path, errno_, strerror(errno_));
|
|
}
|
|
}
|
|
} else if(transfer->transfer->direction == Transfer::DIRECTION_DOWNLOAD) {
|
|
auto file_size = lseek(file_data.file_descriptor, 0, SEEK_END);
|
|
if(file_size != transfer->transfer->expected_file_size) {
|
|
logWarning(LOG_FT, "{} Expected target file to be of size {}, but file is actually of size {}", transfer->log_prefix(), transfer->transfer->expected_file_size, file_size);
|
|
result = FileInitializeResult::FILE_SIZE_MISMATCH;
|
|
goto error_exit;
|
|
}
|
|
}
|
|
{
|
|
auto new_pos = lseek(file_data.file_descriptor, transfer->transfer->file_offset, SEEK_SET);
|
|
if(new_pos < 0) {
|
|
logWarning(LOG_FT, "{} Failed to seek to target file offset ({}): {}/{}", transfer->log_prefix(), transfer->transfer->file_offset, errno, strerror(errno));
|
|
result = FileInitializeResult::FILE_SEEK_FAILED;
|
|
goto error_exit;
|
|
} else if(new_pos != transfer->transfer->file_offset) {
|
|
logWarning(LOG_FT, "{} File rw offset mismatch after seek. Expected {} but received {}", transfer->log_prefix(), transfer->transfer->file_offset, new_pos);
|
|
result = FileInitializeResult::FILE_SEEK_FAILED;
|
|
goto error_exit;
|
|
}
|
|
debugMessage(LOG_FT, "{} Seek to file offset {}. New actual offset is {}", transfer->log_prefix(), transfer->transfer->file_offset, new_pos);
|
|
}
|
|
|
|
return FileInitializeResult::SUCCESS;
|
|
error_exit:
|
|
if(std::exchange(transfer->file.file_locked, false))
|
|
this->file_system_.unlock_file(transfer->file.absolute_path);
|
|
|
|
if(file_data.file_descriptor > 0)
|
|
::close(file_data.file_descriptor);
|
|
file_data.file_descriptor = 0;
|
|
|
|
return result;
|
|
}
|
|
|
|
void LocalFileTransfer::finalize_file_io(const std::shared_ptr<FileClient> &transfer,
|
|
std::unique_lock<std::shared_mutex> &state_lock) {
|
|
assert(state_lock.owns_lock());
|
|
|
|
auto& file_data = transfer->file;
|
|
|
|
state_lock.unlock();
|
|
{
|
|
std::unique_lock dlock{this->disk_io.queue_lock};
|
|
while(true) {
|
|
if(file_data.currently_processing) {
|
|
this->disk_io.notify_client_processed.wait(dlock);
|
|
continue;
|
|
}
|
|
|
|
if(file_data.next_client) {
|
|
if(this->disk_io.queue_head == &*transfer) {
|
|
this->disk_io.queue_head = file_data.next_client;
|
|
if(!this->disk_io.queue_head)
|
|
this->disk_io.queue_tail = &this->disk_io.queue_head;
|
|
} else {
|
|
FileClient* head{this->disk_io.queue_head};
|
|
while(head->file.next_client != &*transfer) {
|
|
assert(head->file.next_client);
|
|
head = head->file.next_client;
|
|
}
|
|
|
|
head->file.next_client = file_data.next_client;
|
|
if(!file_data.next_client)
|
|
this->disk_io.queue_tail = &head->file.next_client;
|
|
|
|
}
|
|
file_data.next_client = nullptr;
|
|
}
|
|
|
|
break;
|
|
}
|
|
}
|
|
state_lock.lock();
|
|
|
|
if(std::exchange(file_data.file_locked, false))
|
|
this->file_system_.unlock_file(file_data.absolute_path);
|
|
|
|
if(file_data.file_descriptor > 0)
|
|
::close(file_data.file_descriptor);
|
|
file_data.file_descriptor = 0;
|
|
}
|
|
|
|
void LocalFileTransfer::enqueue_disk_io(const std::shared_ptr<FileClient> &client) {
|
|
if(!client->file.file_descriptor)
|
|
return;
|
|
|
|
if(!client->transfer)
|
|
return;
|
|
|
|
if(client->transfer->direction == Transfer::DIRECTION_DOWNLOAD) {
|
|
if(client->state != FileClient::STATE_TRANSFERRING)
|
|
return;
|
|
|
|
if(client->buffer.bytes > TRANSFER_MAX_CACHED_BYTES)
|
|
return;
|
|
} else if(client->transfer->direction == Transfer::DIRECTION_UPLOAD) {
|
|
/* we don't do this check because this might be a flush instruction, where the buffer is actually zero bytes filled */
|
|
/*
|
|
if(client->buffer.bytes == 0)
|
|
return;
|
|
*/
|
|
}
|
|
|
|
std::lock_guard dlock{this->disk_io.queue_lock};
|
|
if(client->file.next_client || this->disk_io.queue_tail == &client->file.next_client)
|
|
return;
|
|
|
|
*this->disk_io.queue_tail = &*client;
|
|
this->disk_io.queue_tail = &client->file.next_client;
|
|
|
|
this->disk_io.notify_work_awaiting.notify_all();
|
|
}
|
|
|
|
void LocalFileTransfer::execute_disk_io(const std::shared_ptr<FileClient> &client) {
|
|
if(!client->transfer) return;
|
|
|
|
if(client->transfer->direction == Transfer::DIRECTION_UPLOAD) {
|
|
Buffer* buffer{nullptr};
|
|
size_t buffer_left_size{0};
|
|
|
|
while(true) {
|
|
{
|
|
std::lock_guard block{client->buffer.mutex};
|
|
buffer = client->buffer.buffer_head;
|
|
buffer_left_size = client->buffer.bytes;
|
|
}
|
|
if(!buffer) {
|
|
assert(buffer_left_size == 0);
|
|
break;
|
|
}
|
|
|
|
assert(buffer->offset < buffer->length);
|
|
auto written = ::write(client->file.file_descriptor, buffer->data + buffer->offset, buffer->length - buffer->offset);
|
|
if(written <= 0) {
|
|
if(written == 0) {
|
|
/* EOF, how the hell is this event possible?! */
|
|
auto offset_written = client->statistics.disk_bytes_write + client->transfer->file_offset;
|
|
auto aoffset = lseek(client->file.file_descriptor, 0, SEEK_CUR);
|
|
logError(LOG_FT, "{} Received unexpected file write EOF. EOF received at {} but expected {}. Actual file offset: {}. Closing transfer.",
|
|
client->log_prefix(), offset_written, client->transfer->expected_file_size, aoffset);
|
|
|
|
this->report_transfer_statistics(client);
|
|
if(auto callback{client->handle->callback_transfer_aborted}; callback)
|
|
callback(client->transfer, { TransferError::UNEXPECTED_DISK_EOF, strerror(errno) });
|
|
|
|
{
|
|
std::unique_lock slock{client->state_mutex};
|
|
client->handle->disconnect_client(client, slock, true);
|
|
}
|
|
} else {
|
|
if(errno == EAGAIN) {
|
|
//TODO: Timeout?
|
|
this->enqueue_disk_io(client);
|
|
break;
|
|
}
|
|
|
|
auto offset_written = client->statistics.disk_bytes_write + client->transfer->file_offset;
|
|
auto aoffset = lseek(client->file.file_descriptor, 0, SEEK_CUR);
|
|
logError(LOG_FT, "{} Received write to disk IO error. Write pointer is at {} of {}. Actual file offset: {}. Closing transfer.",
|
|
client->log_prefix(), offset_written, client->transfer->expected_file_size, aoffset);
|
|
|
|
this->report_transfer_statistics(client);
|
|
if(auto callback{client->handle->callback_transfer_aborted}; callback)
|
|
callback(client->transfer, { TransferError::DISK_IO_ERROR, strerror(errno) });
|
|
|
|
{
|
|
std::unique_lock slock{client->state_mutex};
|
|
client->handle->disconnect_client(client, slock, true);
|
|
}
|
|
}
|
|
return;
|
|
} else {
|
|
buffer->offset += written;
|
|
assert(buffer->offset <= buffer->length);
|
|
if(buffer->length == buffer->offset) {
|
|
{
|
|
std::lock_guard block{client->buffer.mutex};
|
|
client->buffer.buffer_head = buffer->next;
|
|
if(!buffer->next)
|
|
client->buffer.buffer_tail = &client->buffer.buffer_head;
|
|
|
|
assert(client->buffer.bytes >= written);
|
|
client->buffer.bytes -= written;
|
|
buffer_left_size = client->buffer.bytes;
|
|
(void) buffer_left_size; /* trick my IDE here a bit */
|
|
}
|
|
|
|
free_buffer(buffer);
|
|
} else {
|
|
std::lock_guard block{client->buffer.mutex};
|
|
assert(client->buffer.bytes >= written);
|
|
client->buffer.bytes -= written;
|
|
buffer_left_size = client->buffer.bytes;
|
|
(void) buffer_left_size; /* trick my IDE here a bit */
|
|
}
|
|
|
|
client->statistics.disk_bytes_write += written;
|
|
}
|
|
}
|
|
|
|
if(buffer_left_size > 0)
|
|
this->enqueue_disk_io(client);
|
|
else if(client->state == FileClient::STATE_DISCONNECTING) {
|
|
debugMessage(LOG_FT, "{} Disk IO has been flushed.", client->log_prefix());
|
|
|
|
std::unique_lock slock{client->state_mutex};
|
|
client->handle->disconnect_client(client->shared_from_this(), slock, false);
|
|
}
|
|
|
|
if(client->state == FileClient::STATE_TRANSFERRING && buffer_left_size < TRANSFER_MAX_CACHED_BYTES / 2) {
|
|
if(client->buffer.buffering_stopped)
|
|
logMessage(LOG_FT, "{} Starting network read, buffer is capable for reading again.", client->log_prefix());
|
|
client->add_network_read_event(false);
|
|
}
|
|
} else if(client->transfer->direction == Transfer::DIRECTION_DOWNLOAD) {
|
|
while(true) {
|
|
constexpr auto buffer_capacity{4096};
|
|
char buffer[buffer_capacity];
|
|
|
|
auto read = ::read(client->file.file_descriptor, buffer, buffer_capacity);
|
|
if(read <= 0) {
|
|
if(read == 0) {
|
|
/* EOF */
|
|
auto offset_send = client->statistics.disk_bytes_read + client->transfer->file_offset;
|
|
if(client->transfer->expected_file_size == offset_send) {
|
|
debugMessage(LOG_FT, "{} Finished file reading. Flushing and disconnecting transfer. Reading took {} seconds.",
|
|
client->log_prefix(), duration_to_string(std::chrono::system_clock::now() - client->timings.key_received));
|
|
} else {
|
|
auto aoffset = lseek(client->file.file_descriptor, 0, SEEK_CUR);
|
|
logError(LOG_FT, "{} Received unexpected read EOF. EOF received at {} but expected {}. Actual file offset: {}. Disconnecting client.",
|
|
client->log_prefix(), offset_send, client->transfer->expected_file_size, aoffset);
|
|
|
|
this->report_transfer_statistics(client);
|
|
if(auto callback{client->handle->callback_transfer_aborted}; callback)
|
|
callback(client->transfer, { TransferError::UNEXPECTED_DISK_EOF, strerror(errno) });
|
|
}
|
|
|
|
{
|
|
std::unique_lock slock{client->state_mutex};
|
|
client->handle->disconnect_client(client, slock, true);
|
|
}
|
|
} else {
|
|
if(errno == EAGAIN) {
|
|
this->enqueue_disk_io(client);
|
|
return;
|
|
}
|
|
|
|
logWarning(LOG_FT, "{} Failed to read from file {} ({}/{}). Aborting transfer.", client->log_prefix(), client->file.absolute_path, errno, strerror(errno));
|
|
|
|
this->report_transfer_statistics(client);
|
|
if(auto callback{client->handle->callback_transfer_aborted}; callback)
|
|
callback(client->transfer, { TransferError::DISK_IO_ERROR, strerror(errno) });
|
|
|
|
{
|
|
std::unique_lock slock{client->state_mutex};
|
|
client->handle->disconnect_client(client, slock, true);
|
|
}
|
|
}
|
|
return;
|
|
} else {
|
|
auto buffer_full = client->send_file_bytes(buffer, read);
|
|
client->statistics.disk_bytes_read += read;
|
|
client->statistics.file_bytes_transferred += read;
|
|
|
|
std::shared_lock slock{client->state_mutex};
|
|
if(buffer_full) {
|
|
logMessage(LOG_FT, "{} Stopping buffering from disk. Buffer full ({}bytes)", client->log_prefix(), client->buffer.bytes);
|
|
break;
|
|
}
|
|
|
|
/* we've stuff to write again, yeahr */
|
|
client->add_network_write_event(false);
|
|
}
|
|
}
|
|
} else {
|
|
logError(LOG_FT, "{} Disk IO scheduled, but transfer direction is unknown.", client->log_prefix());
|
|
}
|
|
} |