Some bug fixes and final version before release
This commit is contained in:
@@ -98,6 +98,24 @@ void LocalFileTransfer::dispatch_loop_disk_io(void *provider_ptr) {
|
||||
provider->disk_io.notify_client_processed.notify_all();
|
||||
}
|
||||
|
||||
bool FileClient::enqueue_disk_buffer_bytes(const void *snd_buffer, size_t size) {
|
||||
auto tbuffer = allocate_buffer(size);
|
||||
tbuffer->length = size;
|
||||
tbuffer->offset = 0;
|
||||
memcpy(tbuffer->data, snd_buffer, size);
|
||||
|
||||
size_t buffer_size;
|
||||
{
|
||||
std::lock_guard block{this->disk_buffer.mutex};
|
||||
*this->disk_buffer.buffer_tail = tbuffer;
|
||||
this->disk_buffer.buffer_tail = &tbuffer->next;
|
||||
|
||||
buffer_size = (this->disk_buffer.bytes += size);
|
||||
}
|
||||
|
||||
return buffer_size > TRANSFER_MAX_CACHED_BYTES;
|
||||
}
|
||||
|
||||
FileInitializeResult LocalFileTransfer::initialize_file_io(const std::shared_ptr<FileClient> &transfer) {
|
||||
FileInitializeResult result{FileInitializeResult::SUCCESS};
|
||||
assert(transfer->transfer);
|
||||
@@ -123,6 +141,18 @@ FileInitializeResult LocalFileTransfer::initialize_file_io(const std::shared_ptr
|
||||
goto error_exit;
|
||||
}
|
||||
} else if(transfer->transfer->direction == Transfer::DIRECTION_UPLOAD) {
|
||||
std::error_code fs_error{};
|
||||
auto parent_path = fs::u8path(absolute_path).parent_path();
|
||||
if(!fs::exists(parent_path)) {
|
||||
if(!fs::create_directories(parent_path, fs_error) || fs_error) {
|
||||
logError(LOG_FT, "{} Failed to create required directories for file upload for {}: {}/{}", transfer->log_prefix(), absolute_path, fs_error.value(), fs_error.message());
|
||||
result = FileInitializeResult::COUNT_NOT_CREATE_DIRECTORIES;
|
||||
goto error_exit;
|
||||
}
|
||||
} else if(fs_error) {
|
||||
logWarning(LOG_FT, "{} Failed to check for directory existence of {}: {}/{}. Assuming it exists", transfer->log_prefix(), parent_path.string(), fs_error.value(), fs_error.message());
|
||||
}
|
||||
|
||||
open_flags = (unsigned) O_WRONLY | (unsigned) O_CREAT;
|
||||
if(transfer->transfer->override_exiting)
|
||||
open_flags |= (unsigned) O_TRUNC;
|
||||
@@ -157,7 +187,8 @@ FileInitializeResult LocalFileTransfer::initialize_file_io(const std::shared_ptr
|
||||
result = FileInitializeResult::DISK_IS_READ_ONLY;
|
||||
break;
|
||||
default:
|
||||
logWarning(LOG_FT, "{} Failed to start file transfer for file {}: {}/{}", transfer->log_prefix(), absolute_path, errno_, strerror(errno_));
|
||||
logWarning(LOG_FT, "{} Failed to start file {} for file {}: {}/{}", transfer->log_prefix(),
|
||||
transfer->transfer->direction == Transfer::DIRECTION_DOWNLOAD ? "download" : "upload", absolute_path, errno_, strerror(errno_));
|
||||
result = FileInitializeResult::FILE_SYSTEM_ERROR;
|
||||
break;
|
||||
}
|
||||
@@ -195,13 +226,32 @@ FileInitializeResult LocalFileTransfer::initialize_file_io(const std::shared_ptr
|
||||
}
|
||||
}
|
||||
} else if(transfer->transfer->direction == Transfer::DIRECTION_DOWNLOAD) {
|
||||
transfer->disk_buffer.flushed = true; /* we're not using this buffer, so it will be flushed all the times */
|
||||
|
||||
auto file_size = lseek(file_data.file_descriptor, 0, SEEK_END);
|
||||
if(file_size != transfer->transfer->expected_file_size) {
|
||||
logWarning(LOG_FT, "{} Expected target file to be of size {}, but file is actually of size {}", transfer->log_prefix(), transfer->transfer->expected_file_size, file_size);
|
||||
result = FileInitializeResult::FILE_SIZE_MISMATCH;
|
||||
goto error_exit;
|
||||
}
|
||||
if(transfer->file.query_media_bytes && file_size > 0) {
|
||||
auto new_pos = lseek(file_data.file_descriptor, 0, SEEK_SET);
|
||||
if(new_pos < 0) {
|
||||
logWarning(LOG_FT, "{} Failed to seek to file start: {}/{}", transfer->log_prefix(), errno, strerror(errno));
|
||||
result = FileInitializeResult::FILE_SEEK_FAILED;
|
||||
goto error_exit;
|
||||
}
|
||||
|
||||
auto read = ::read(file_data.file_descriptor, transfer->file.media_bytes, TRANSFER_MEDIA_BYTES_LENGTH);
|
||||
if(read <= 0) {
|
||||
logWarning(LOG_FT, "{} Failed to read file media bytes: {}/{}", transfer->log_prefix(), errno, strerror(errno));
|
||||
result = FileInitializeResult::FAILED_TO_READ_MEDIA_BYTES;
|
||||
goto error_exit;
|
||||
}
|
||||
transfer->file.media_bytes_length = read;
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
auto new_pos = lseek(file_data.file_descriptor, transfer->transfer->file_offset, SEEK_SET);
|
||||
if(new_pos < 0) {
|
||||
@@ -213,7 +263,7 @@ FileInitializeResult LocalFileTransfer::initialize_file_io(const std::shared_ptr
|
||||
result = FileInitializeResult::FILE_SEEK_FAILED;
|
||||
goto error_exit;
|
||||
}
|
||||
debugMessage(LOG_FT, "{} Seek to file offset {}. New actual offset is {}", transfer->log_prefix(), transfer->transfer->file_offset, new_pos);
|
||||
logTrace(LOG_FT, "{} Seek to file offset {}. New actual offset is {}", transfer->log_prefix(), transfer->transfer->file_offset, new_pos);
|
||||
}
|
||||
|
||||
return FileInitializeResult::SUCCESS;
|
||||
@@ -231,6 +281,9 @@ FileInitializeResult LocalFileTransfer::initialize_file_io(const std::shared_ptr
|
||||
void LocalFileTransfer::finalize_file_io(const std::shared_ptr<FileClient> &transfer,
|
||||
std::unique_lock<std::shared_mutex> &state_lock) {
|
||||
assert(state_lock.owns_lock());
|
||||
if(!transfer->transfer)
|
||||
return;
|
||||
|
||||
auto absolute_path = transfer->transfer->absolute_file_path;
|
||||
|
||||
auto& file_data = transfer->file;
|
||||
@@ -244,26 +297,22 @@ void LocalFileTransfer::finalize_file_io(const std::shared_ptr<FileClient> &tran
|
||||
continue;
|
||||
}
|
||||
|
||||
if(file_data.next_client) {
|
||||
if(this->disk_io.queue_head == &*transfer) {
|
||||
this->disk_io.queue_head = file_data.next_client;
|
||||
if(!this->disk_io.queue_head)
|
||||
this->disk_io.queue_tail = &this->disk_io.queue_head;
|
||||
} else {
|
||||
FileClient* head{this->disk_io.queue_head};
|
||||
while(head->file.next_client != &*transfer) {
|
||||
assert(head->file.next_client);
|
||||
head = head->file.next_client;
|
||||
}
|
||||
|
||||
head->file.next_client = file_data.next_client;
|
||||
if(!file_data.next_client)
|
||||
this->disk_io.queue_tail = &head->file.next_client;
|
||||
|
||||
if(this->disk_io.queue_head == &*transfer) {
|
||||
this->disk_io.queue_head = file_data.next_client;
|
||||
if (!this->disk_io.queue_head)
|
||||
this->disk_io.queue_tail = &this->disk_io.queue_head;
|
||||
} else if(file_data.next_client || this->disk_io.queue_tail == &file_data.next_client) {
|
||||
FileClient* head{this->disk_io.queue_head};
|
||||
while(head->file.next_client != &*transfer) {
|
||||
assert(head->file.next_client);
|
||||
head = head->file.next_client;
|
||||
}
|
||||
file_data.next_client = nullptr;
|
||||
}
|
||||
|
||||
head->file.next_client = file_data.next_client;
|
||||
if(!file_data.next_client)
|
||||
this->disk_io.queue_tail = &head->file.next_client;
|
||||
}
|
||||
file_data.next_client = nullptr;
|
||||
break;
|
||||
}
|
||||
}
|
||||
@@ -288,7 +337,7 @@ void LocalFileTransfer::enqueue_disk_io(const std::shared_ptr<FileClient> &clien
|
||||
if(client->state != FileClient::STATE_TRANSFERRING)
|
||||
return;
|
||||
|
||||
if(client->buffer.bytes > TRANSFER_MAX_CACHED_BYTES)
|
||||
if(client->disk_buffer.bytes > TRANSFER_MAX_CACHED_BYTES)
|
||||
return;
|
||||
} else if(client->transfer->direction == Transfer::DIRECTION_UPLOAD) {
|
||||
/* we don't do this check because this might be a flush instruction, where the buffer is actually zero bytes filled */
|
||||
@@ -317,9 +366,9 @@ void LocalFileTransfer::execute_disk_io(const std::shared_ptr<FileClient> &clien
|
||||
|
||||
while(true) {
|
||||
{
|
||||
std::lock_guard block{client->buffer.mutex};
|
||||
buffer = client->buffer.buffer_head;
|
||||
buffer_left_size = client->buffer.bytes;
|
||||
std::lock_guard block{client->disk_buffer.mutex};
|
||||
buffer = client->disk_buffer.buffer_head;
|
||||
buffer_left_size = client->disk_buffer.bytes;
|
||||
}
|
||||
if(!buffer) {
|
||||
assert(buffer_left_size == 0);
|
||||
@@ -366,23 +415,23 @@ void LocalFileTransfer::execute_disk_io(const std::shared_ptr<FileClient> &clien
|
||||
assert(buffer->offset <= buffer->length);
|
||||
if(buffer->length == buffer->offset) {
|
||||
{
|
||||
std::lock_guard block{client->buffer.mutex};
|
||||
client->buffer.buffer_head = buffer->next;
|
||||
std::lock_guard block{client->disk_buffer.mutex};
|
||||
client->disk_buffer.buffer_head = buffer->next;
|
||||
if(!buffer->next)
|
||||
client->buffer.buffer_tail = &client->buffer.buffer_head;
|
||||
client->disk_buffer.buffer_tail = &client->disk_buffer.buffer_head;
|
||||
|
||||
assert(client->buffer.bytes >= written);
|
||||
client->buffer.bytes -= written;
|
||||
buffer_left_size = client->buffer.bytes;
|
||||
assert(client->disk_buffer.bytes >= written);
|
||||
client->disk_buffer.bytes -= written;
|
||||
buffer_left_size = client->disk_buffer.bytes;
|
||||
(void) buffer_left_size; /* trick my IDE here a bit */
|
||||
}
|
||||
|
||||
free_buffer(buffer);
|
||||
} else {
|
||||
std::lock_guard block{client->buffer.mutex};
|
||||
assert(client->buffer.bytes >= written);
|
||||
client->buffer.bytes -= written;
|
||||
buffer_left_size = client->buffer.bytes;
|
||||
std::lock_guard block{client->disk_buffer.mutex};
|
||||
assert(client->disk_buffer.bytes >= written);
|
||||
client->disk_buffer.bytes -= written;
|
||||
buffer_left_size = client->disk_buffer.bytes;
|
||||
(void) buffer_left_size; /* trick my IDE here a bit */
|
||||
}
|
||||
|
||||
@@ -390,17 +439,29 @@ void LocalFileTransfer::execute_disk_io(const std::shared_ptr<FileClient> &clien
|
||||
}
|
||||
}
|
||||
|
||||
if(buffer_left_size > 0)
|
||||
if(buffer_left_size > 0) {
|
||||
this->enqueue_disk_io(client);
|
||||
else if(client->state == FileClient::STATE_DISCONNECTING) {
|
||||
debugMessage(LOG_FT, "{} Disk IO has been flushed.", client->log_prefix());
|
||||
} else if(client->state == FileClient::STATE_DISCONNECTING) {
|
||||
{
|
||||
std::lock_guard nb_lock{client->network_buffer.mutex};
|
||||
{
|
||||
std::lock_guard db_lock{client->disk_buffer.mutex};
|
||||
if(std::exchange(client->disk_buffer.flushed, true))
|
||||
return;
|
||||
}
|
||||
if(!client->network_buffer.flushed) {
|
||||
logTrace(LOG_FT, "{} Disk IO has been flushed, awaiting network buffer flush.", client->log_prefix());
|
||||
return;
|
||||
}
|
||||
logTrace(LOG_FT, "{} Disk IO and network buffer have been flushed.", client->log_prefix());
|
||||
}
|
||||
|
||||
std::unique_lock slock{client->state_mutex};
|
||||
client->handle->disconnect_client(client->shared_from_this(), slock, false);
|
||||
}
|
||||
|
||||
if(client->state == FileClient::STATE_TRANSFERRING && buffer_left_size < TRANSFER_MAX_CACHED_BYTES / 2) {
|
||||
if(client->buffer.buffering_stopped)
|
||||
if(client->disk_buffer.buffering_stopped)
|
||||
logMessage(LOG_FT, "{} Starting network read, buffer is capable for reading again.", client->log_prefix());
|
||||
client->add_network_read_event(false);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user