// // Created by WolverinDEV on 04/05/2020. // #include #include #include #include #include "./LocalFileProvider.h" #include "./duration_utils.h" #include "LocalFileProvider.h" using namespace ts::server::file; using namespace ts::server::file::transfer; namespace fs = std::experimental::filesystem; DiskIOStartResult LocalFileTransfer::start_disk_io() { assert(this->disk_io.state == DiskIOLoopState::STOPPED); this->disk_io.state = DiskIOLoopState::RUNNING; this->disk_io.dispatch_thread = std::thread(&LocalFileTransfer::dispatch_loop_disk_io, this); return DiskIOStartResult::SUCCESS; } void LocalFileTransfer::shutdown_disk_io() { if(this->disk_io.state == DiskIOLoopState::STOPPED) return; this->disk_io.state = DiskIOLoopState::STOPPING; { std::unique_lock qlock{this->disk_io.queue_lock}; this->disk_io.notify_work_awaiting.notify_all(); while(this->disk_io.queue_head) this->disk_io.notify_client_processed.wait_for(qlock, std::chrono::seconds{10}); if(this->disk_io.queue_head) { logWarning(0, "Failed to flush disk IO. Force aborting."); this->disk_io.state = DiskIOLoopState::FORCE_STOPPING; this->disk_io.notify_work_awaiting.notify_all(); this->disk_io.notify_client_processed.wait(qlock); } } if(this->disk_io.dispatch_thread.joinable()) this->disk_io.dispatch_thread.join(); this->disk_io.state = DiskIOLoopState::STOPPED; } void LocalFileTransfer::dispatch_loop_disk_io(void *provider_ptr) { auto provider = reinterpret_cast(provider_ptr); std::shared_ptr client{}; while(true) { { std::unique_lock qlock{provider->disk_io.queue_lock}; if(client) { client->file.currently_processing = false; provider->disk_io.notify_client_processed.notify_all(); client = nullptr; } provider->disk_io.notify_work_awaiting.wait(qlock, [&]{ return provider->disk_io.state != DiskIOLoopState::RUNNING || provider->disk_io.queue_head != nullptr; }); if(provider->disk_io.queue_head) { client = provider->disk_io.queue_head->shared_from_this(); provider->disk_io.queue_head = provider->disk_io.queue_head->file.next_client; if(!provider->disk_io.queue_head) provider->disk_io.queue_tail = &provider->disk_io.queue_head; } if(provider->disk_io.state != DiskIOLoopState::RUNNING) { if(provider->disk_io.state == DiskIOLoopState::STOPPING) { if(!client) break; /* break only if all clients have been flushed */ } else { /* force stopping without any flushing */ auto fclient = &*client; while(fclient) fclient = std::exchange(fclient->file.next_client, nullptr); provider->disk_io.queue_head = nullptr; provider->disk_io.queue_tail = &provider->disk_io.queue_head; break; } } if(!client) continue; client->file.currently_processing = true; client->file.next_client = nullptr; } provider->execute_disk_io(client); } provider->disk_io.notify_client_processed.notify_all(); } FileInitializeResult LocalFileTransfer::initialize_file_io(const std::shared_ptr &transfer) { FileInitializeResult result{FileInitializeResult::SUCCESS}; assert(transfer->transfer); std::shared_lock slock{transfer->state_mutex}; auto& file_data = transfer->file; assert(!file_data.file_descriptor); assert(!file_data.next_client); auto absolute_path = transfer->transfer->absolute_file_path; { unsigned int open_flags{0}; if(transfer->transfer->direction == Transfer::DIRECTION_DOWNLOAD) { open_flags = O_RDONLY; std::error_code fs_error{}; if(absolute_path.empty() || !fs::exists(absolute_path, fs_error)) { result = FileInitializeResult::FILE_DOES_NOT_EXISTS; goto error_exit; } else if(fs_error) { logWarning(LOG_FT, "{} Failed to check for file existence of {}: {}/{}", transfer->log_prefix(), absolute_path, fs_error.value(), fs_error.message()); result = FileInitializeResult::FILE_SYSTEM_ERROR; goto error_exit; } } else if(transfer->transfer->direction == Transfer::DIRECTION_UPLOAD) { open_flags = (unsigned) O_WRONLY | (unsigned) O_CREAT; if(transfer->transfer->override_exiting) open_flags |= (unsigned) O_TRUNC; } else { return FileInitializeResult::INVALID_TRANSFER_DIRECTION; } file_data.file_descriptor = open(absolute_path.c_str(), (int) open_flags, 0644); if(file_data.file_descriptor <= 0) { const auto errno_ = errno; switch (errno_) { case EACCES: result = FileInitializeResult::FILE_IS_NOT_ACCESSIBLE; break; case EDQUOT: logWarning(LOG_FT, "{} Disk inode limit has been reached. Failed to start file transfer for file {}", transfer->log_prefix(), absolute_path); result = FileInitializeResult::FILE_SYSTEM_ERROR; break; case EISDIR: result = FileInitializeResult::FILE_IS_A_DIRECTORY; break; case EMFILE: result = FileInitializeResult::PROCESS_FILE_LIMIT_REACHED; break; case ENFILE: result = FileInitializeResult::SYSTEM_FILE_LIMIT_REACHED; break; case ETXTBSY: result = FileInitializeResult::FILE_IS_BUSY; break; case EROFS: result = FileInitializeResult::DISK_IS_READ_ONLY; break; default: logWarning(LOG_FT, "{} Failed to start file transfer for file {}: {}/{}", transfer->log_prefix(), absolute_path, errno_, strerror(errno_)); result = FileInitializeResult::FILE_SYSTEM_ERROR; break; } goto error_exit; } } this->file_system_.lock_file(absolute_path); transfer->file.file_locked = true; if(transfer->transfer->direction == Transfer::DIRECTION_UPLOAD) { if(ftruncate(file_data.file_descriptor, transfer->transfer->expected_file_size) != 0) { const auto errno_ = errno; switch (errno_) { case EACCES: logWarning(LOG_FT, "{} File {} got inaccessible on truncating, but not on opening.", transfer->log_prefix(), absolute_path); result = FileInitializeResult::FILE_IS_NOT_ACCESSIBLE; goto error_exit; case EFBIG: result = FileInitializeResult::FILE_TOO_LARGE; goto error_exit; case EIO: logWarning(LOG_FT, "{} A disk IO error occurred while resizing file {}.", transfer->log_prefix(), absolute_path); result = FileInitializeResult::FILE_IS_NOT_ACCESSIBLE; goto error_exit; case EROFS: logWarning(LOG_FT, "{} Failed to resize file {} because disk is in read only mode.", transfer->log_prefix(), absolute_path); result = FileInitializeResult::FILE_IS_NOT_ACCESSIBLE; goto error_exit; default: debugMessage(LOG_FT, "{} Failed to truncate file {}: {}/{}. Trying to upload file anyways.", transfer->log_prefix(), absolute_path, errno_, strerror(errno_)); } } } else if(transfer->transfer->direction == Transfer::DIRECTION_DOWNLOAD) { auto file_size = lseek(file_data.file_descriptor, 0, SEEK_END); if(file_size != transfer->transfer->expected_file_size) { logWarning(LOG_FT, "{} Expected target file to be of size {}, but file is actually of size {}", transfer->log_prefix(), transfer->transfer->expected_file_size, file_size); result = FileInitializeResult::FILE_SIZE_MISMATCH; goto error_exit; } } { auto new_pos = lseek(file_data.file_descriptor, transfer->transfer->file_offset, SEEK_SET); if(new_pos < 0) { logWarning(LOG_FT, "{} Failed to seek to target file offset ({}): {}/{}", transfer->log_prefix(), transfer->transfer->file_offset, errno, strerror(errno)); result = FileInitializeResult::FILE_SEEK_FAILED; goto error_exit; } else if(new_pos != transfer->transfer->file_offset) { logWarning(LOG_FT, "{} File rw offset mismatch after seek. Expected {} but received {}", transfer->log_prefix(), transfer->transfer->file_offset, new_pos); result = FileInitializeResult::FILE_SEEK_FAILED; goto error_exit; } debugMessage(LOG_FT, "{} Seek to file offset {}. New actual offset is {}", transfer->log_prefix(), transfer->transfer->file_offset, new_pos); } return FileInitializeResult::SUCCESS; error_exit: if(std::exchange(transfer->file.file_locked, false)) this->file_system_.unlock_file(absolute_path); if(file_data.file_descriptor > 0) ::close(file_data.file_descriptor); file_data.file_descriptor = 0; return result; } void LocalFileTransfer::finalize_file_io(const std::shared_ptr &transfer, std::unique_lock &state_lock) { assert(state_lock.owns_lock()); auto absolute_path = transfer->transfer->absolute_file_path; auto& file_data = transfer->file; state_lock.unlock(); { std::unique_lock dlock{this->disk_io.queue_lock}; while(true) { if(file_data.currently_processing) { this->disk_io.notify_client_processed.wait(dlock); continue; } if(file_data.next_client) { if(this->disk_io.queue_head == &*transfer) { this->disk_io.queue_head = file_data.next_client; if(!this->disk_io.queue_head) this->disk_io.queue_tail = &this->disk_io.queue_head; } else { FileClient* head{this->disk_io.queue_head}; while(head->file.next_client != &*transfer) { assert(head->file.next_client); head = head->file.next_client; } head->file.next_client = file_data.next_client; if(!file_data.next_client) this->disk_io.queue_tail = &head->file.next_client; } file_data.next_client = nullptr; } break; } } state_lock.lock(); if(std::exchange(file_data.file_locked, false)) this->file_system_.unlock_file(absolute_path); if(file_data.file_descriptor > 0) ::close(file_data.file_descriptor); file_data.file_descriptor = 0; } void LocalFileTransfer::enqueue_disk_io(const std::shared_ptr &client) { if(!client->file.file_descriptor) return; if(!client->transfer) return; if(client->transfer->direction == Transfer::DIRECTION_DOWNLOAD) { if(client->state != FileClient::STATE_TRANSFERRING) return; if(client->buffer.bytes > TRANSFER_MAX_CACHED_BYTES) return; } else if(client->transfer->direction == Transfer::DIRECTION_UPLOAD) { /* we don't do this check because this might be a flush instruction, where the buffer is actually zero bytes filled */ /* if(client->buffer.bytes == 0) return; */ } std::lock_guard dlock{this->disk_io.queue_lock}; if(client->file.next_client || this->disk_io.queue_tail == &client->file.next_client) return; *this->disk_io.queue_tail = &*client; this->disk_io.queue_tail = &client->file.next_client; this->disk_io.notify_work_awaiting.notify_all(); } void LocalFileTransfer::execute_disk_io(const std::shared_ptr &client) { if(!client->transfer) return; if(client->transfer->direction == Transfer::DIRECTION_UPLOAD) { Buffer* buffer{nullptr}; size_t buffer_left_size{0}; while(true) { { std::lock_guard block{client->buffer.mutex}; buffer = client->buffer.buffer_head; buffer_left_size = client->buffer.bytes; } if(!buffer) { assert(buffer_left_size == 0); break; } assert(buffer->offset < buffer->length); auto written = ::write(client->file.file_descriptor, buffer->data + buffer->offset, buffer->length - buffer->offset); if(written <= 0) { if(written == 0) { /* EOF, how the hell is this event possible?! */ auto offset_written = client->statistics.disk_bytes_write.total_bytes + client->transfer->file_offset; auto aoffset = lseek(client->file.file_descriptor, 0, SEEK_CUR); logError(LOG_FT, "{} Received unexpected file write EOF. EOF received at {} but expected {}. Actual file offset: {}. Closing transfer.", client->log_prefix(), offset_written, client->transfer->expected_file_size, aoffset); this->report_transfer_statistics(client); if(auto callback{client->handle->callback_transfer_aborted}; callback) callback(client->transfer, { TransferError::UNEXPECTED_DISK_EOF, strerror(errno) }); { std::unique_lock slock{client->state_mutex}; client->handle->disconnect_client(client, slock, true); } } else { if(errno == EAGAIN) { //TODO: Timeout? this->enqueue_disk_io(client); break; } auto offset_written = client->statistics.disk_bytes_write.total_bytes + client->transfer->file_offset; auto aoffset = lseek(client->file.file_descriptor, 0, SEEK_CUR); logError(LOG_FT, "{} Received write to disk IO error. Write pointer is at {} of {}. Actual file offset: {}. Closing transfer.", client->log_prefix(), offset_written, client->transfer->expected_file_size, aoffset); this->report_transfer_statistics(client); if(auto callback{client->handle->callback_transfer_aborted}; callback) callback(client->transfer, { TransferError::DISK_IO_ERROR, strerror(errno) }); { std::unique_lock slock{client->state_mutex}; client->handle->disconnect_client(client, slock, true); } } return; } else { buffer->offset += written; assert(buffer->offset <= buffer->length); if(buffer->length == buffer->offset) { { std::lock_guard block{client->buffer.mutex}; client->buffer.buffer_head = buffer->next; if(!buffer->next) client->buffer.buffer_tail = &client->buffer.buffer_head; assert(client->buffer.bytes >= written); client->buffer.bytes -= written; buffer_left_size = client->buffer.bytes; (void) buffer_left_size; /* trick my IDE here a bit */ } free_buffer(buffer); } else { std::lock_guard block{client->buffer.mutex}; assert(client->buffer.bytes >= written); client->buffer.bytes -= written; buffer_left_size = client->buffer.bytes; (void) buffer_left_size; /* trick my IDE here a bit */ } client->statistics.disk_bytes_write.increase_bytes(written); } } if(buffer_left_size > 0) this->enqueue_disk_io(client); else if(client->state == FileClient::STATE_DISCONNECTING) { debugMessage(LOG_FT, "{} Disk IO has been flushed.", client->log_prefix()); std::unique_lock slock{client->state_mutex}; client->handle->disconnect_client(client->shared_from_this(), slock, false); } if(client->state == FileClient::STATE_TRANSFERRING && buffer_left_size < TRANSFER_MAX_CACHED_BYTES / 2) { if(client->buffer.buffering_stopped) logMessage(LOG_FT, "{} Starting network read, buffer is capable for reading again.", client->log_prefix()); client->add_network_read_event(false); } } else if(client->transfer->direction == Transfer::DIRECTION_DOWNLOAD) { if(client->state == FileClient::STATE_DISCONNECTING) return; while(true) { constexpr auto buffer_capacity{4096}; char buffer[buffer_capacity]; auto read = ::read(client->file.file_descriptor, buffer, buffer_capacity); if(read <= 0) { if(read == 0) { /* EOF */ auto offset_send = client->statistics.disk_bytes_read.total_bytes + client->transfer->file_offset; if(client->transfer->expected_file_size == offset_send) { debugMessage(LOG_FT, "{} Finished file reading. Flushing and disconnecting transfer. Reading took {} seconds.", client->log_prefix(), duration_to_string(std::chrono::system_clock::now() - client->timings.key_received)); } else { auto aoffset = lseek(client->file.file_descriptor, 0, SEEK_CUR); logError(LOG_FT, "{} Received unexpected read EOF. EOF received at {} but expected {}. Actual file offset: {}. Disconnecting client.", client->log_prefix(), offset_send, client->transfer->expected_file_size, aoffset); this->report_transfer_statistics(client); if(auto callback{client->handle->callback_transfer_aborted}; callback) callback(client->transfer, { TransferError::UNEXPECTED_DISK_EOF, strerror(errno) }); } { std::unique_lock slock{client->state_mutex}; client->handle->disconnect_client(client, slock, true); } } else { if(errno == EAGAIN) { this->enqueue_disk_io(client); return; } logWarning(LOG_FT, "{} Failed to read from file {} ({}/{}). Aborting transfer.", client->log_prefix(), client->transfer->absolute_file_path, errno, strerror(errno)); this->report_transfer_statistics(client); if(auto callback{client->handle->callback_transfer_aborted}; callback) callback(client->transfer, { TransferError::DISK_IO_ERROR, strerror(errno) }); { std::unique_lock slock{client->state_mutex}; client->handle->disconnect_client(client, slock, true); } } return; } else { auto buffer_full = client->send_file_bytes(buffer, read); client->statistics.disk_bytes_read.increase_bytes(read); client->statistics.file_transferred.increase_bytes(read); std::shared_lock slock{client->state_mutex}; if(buffer_full) { //logTrace(LOG_FT, "{} Stopping buffering from disk. Buffer full ({}bytes)", client->log_prefix(), client->buffer.bytes); break; } /* we've stuff to write again, yeahr */ client->add_network_write_event(false); } } } else { logError(LOG_FT, "{} Disk IO scheduled, but transfer direction is unknown.", client->log_prefix()); } }