2020-05-07 15:28:15 -04:00
//
// Created by WolverinDEV on 04/05/2020.
//
# include <cassert>
# include <event.h>
# include <experimental/filesystem>
# include <log/LogUtils.h>
# include "./LocalFileProvider.h"
2020-09-16 08:15:06 -04:00
# include "./LocalFileTransfer.h"
2020-06-16 06:57:20 -04:00
# include "duration_utils.h"
2020-05-07 15:28:15 -04:00
using namespace ts : : server : : file ;
using namespace ts : : server : : file : : transfer ;
namespace fs = std : : experimental : : filesystem ;
DiskIOStartResult LocalFileTransfer : : start_disk_io ( ) {
assert ( this - > disk_io . state = = DiskIOLoopState : : STOPPED ) ;
this - > disk_io . state = DiskIOLoopState : : RUNNING ;
this - > disk_io . dispatch_thread = std : : thread ( & LocalFileTransfer : : dispatch_loop_disk_io , this ) ;
return DiskIOStartResult : : SUCCESS ;
}
void LocalFileTransfer : : shutdown_disk_io ( ) {
if ( this - > disk_io . state = = DiskIOLoopState : : STOPPED ) return ;
this - > disk_io . state = DiskIOLoopState : : STOPPING ;
{
std : : unique_lock qlock { this - > disk_io . queue_lock } ;
this - > disk_io . notify_work_awaiting . notify_all ( ) ;
2021-03-11 08:12:12 -05:00
while ( this - > disk_io . queue_head ) {
2020-05-07 15:28:15 -04:00
this - > disk_io . notify_client_processed . wait_for ( qlock , std : : chrono : : seconds { 10 } ) ;
2021-03-11 08:12:12 -05:00
}
2020-05-07 15:28:15 -04:00
if ( this - > disk_io . queue_head ) {
logWarning ( 0 , " Failed to flush disk IO. Force aborting. " ) ;
this - > disk_io . state = DiskIOLoopState : : FORCE_STOPPING ;
this - > disk_io . notify_work_awaiting . notify_all ( ) ;
this - > disk_io . notify_client_processed . wait ( qlock ) ;
}
}
if ( this - > disk_io . dispatch_thread . joinable ( ) )
this - > disk_io . dispatch_thread . join ( ) ;
this - > disk_io . state = DiskIOLoopState : : STOPPED ;
}
void LocalFileTransfer : : dispatch_loop_disk_io ( void * provider_ptr ) {
auto provider = reinterpret_cast < LocalFileTransfer * > ( provider_ptr ) ;
std : : shared_ptr < FileClient > client { } ;
while ( true ) {
{
std : : unique_lock qlock { provider - > disk_io . queue_lock } ;
if ( client ) {
client - > file . currently_processing = false ;
provider - > disk_io . notify_client_processed . notify_all ( ) ;
client = nullptr ;
}
provider - > disk_io . notify_work_awaiting . wait ( qlock , [ & ] { return provider - > disk_io . state ! = DiskIOLoopState : : RUNNING | | provider - > disk_io . queue_head ! = nullptr ; } ) ;
if ( provider - > disk_io . queue_head ) {
2020-09-16 08:15:06 -04:00
try {
client = provider - > disk_io . queue_head - > shared_from_this ( ) ;
} catch ( std : : bad_weak_ptr & ex ) {
logCritical ( LOG_FT , " Disk worker encountered a bad weak ptr. This indicated something went horribly wrong! Please submit this on https://forum.teaspeak.de !!! " ) ;
client . reset ( ) ;
continue ;
}
2020-05-07 15:28:15 -04:00
provider - > disk_io . queue_head = provider - > disk_io . queue_head - > file . next_client ;
2021-01-03 11:16:23 -05:00
if ( ! provider - > disk_io . queue_head ) {
2020-05-07 15:28:15 -04:00
provider - > disk_io . queue_tail = & provider - > disk_io . queue_head ;
2021-01-03 11:16:23 -05:00
}
2020-05-07 15:28:15 -04:00
}
if ( provider - > disk_io . state ! = DiskIOLoopState : : RUNNING ) {
if ( provider - > disk_io . state = = DiskIOLoopState : : STOPPING ) {
if ( ! client )
break ;
/* break only if all clients have been flushed */
} else {
/* force stopping without any flushing */
auto fclient = & * client ;
2021-01-03 11:16:23 -05:00
while ( fclient ) {
2020-05-07 15:28:15 -04:00
fclient = std : : exchange ( fclient - > file . next_client , nullptr ) ;
2021-01-03 11:16:23 -05:00
}
2020-05-07 15:28:15 -04:00
provider - > disk_io . queue_head = nullptr ;
provider - > disk_io . queue_tail = & provider - > disk_io . queue_head ;
break ;
}
}
2021-01-03 11:16:23 -05:00
if ( ! client ) {
2020-05-07 15:28:15 -04:00
continue ;
2021-01-03 11:16:23 -05:00
}
2020-05-07 15:28:15 -04:00
client - > file . currently_processing = true ;
client - > file . next_client = nullptr ;
}
provider - > execute_disk_io ( client ) ;
}
provider - > disk_io . notify_client_processed . notify_all ( ) ;
}
2020-06-10 12:13:14 -04:00
bool FileClient : : enqueue_disk_buffer_bytes ( const void * snd_buffer , size_t size ) {
auto tbuffer = allocate_buffer ( size ) ;
tbuffer - > length = size ;
tbuffer - > offset = 0 ;
memcpy ( tbuffer - > data , snd_buffer , size ) ;
size_t buffer_size ;
{
std : : lock_guard block { this - > disk_buffer . mutex } ;
2021-01-14 16:16:56 -05:00
if ( this - > disk_buffer . write_disconnected ) {
2020-06-11 07:08:45 -04:00
goto write_disconnected ;
2021-01-14 16:16:56 -05:00
}
2020-06-11 07:08:45 -04:00
2020-06-10 12:13:14 -04:00
* this - > disk_buffer . buffer_tail = tbuffer ;
this - > disk_buffer . buffer_tail = & tbuffer - > next ;
buffer_size = ( this - > disk_buffer . bytes + = size ) ;
}
return buffer_size > TRANSFER_MAX_CACHED_BYTES ;
2020-06-11 07:08:45 -04:00
write_disconnected :
2021-01-03 11:16:23 -05:00
deref_buffer ( tbuffer ) ;
2020-06-11 07:08:45 -04:00
return false ;
}
void FileClient : : flush_disk_buffer ( ) {
Buffer * current_head ;
{
std : : lock_guard block { this - > disk_buffer . mutex } ;
this - > disk_buffer . write_disconnected = true ;
this - > disk_buffer . bytes = 0 ;
current_head = std : : exchange ( this - > disk_buffer . buffer_head , nullptr ) ;
this - > disk_buffer . buffer_tail = & this - > disk_buffer . buffer_head ;
}
while ( current_head ) {
auto next = current_head - > next ;
2021-01-03 11:16:23 -05:00
deref_buffer ( current_head ) ;
2020-06-11 07:08:45 -04:00
current_head = next ;
}
2020-06-10 12:13:14 -04:00
}
2020-07-30 05:40:03 -04:00
bool FileClient : : buffers_flushed ( ) {
{
std : : lock_guard db_lock { this - > disk_buffer . mutex } ;
if ( this - > disk_buffer . bytes > 0 )
return false ;
}
{
std : : lock_guard nb_lock { this - > network_buffer . mutex } ;
if ( this - > network_buffer . bytes > 0 )
return false ;
}
return true ;
}
2020-05-07 15:28:15 -04:00
FileInitializeResult LocalFileTransfer : : initialize_file_io ( const std : : shared_ptr < FileClient > & transfer ) {
FileInitializeResult result { FileInitializeResult : : SUCCESS } ;
assert ( transfer - > transfer ) ;
std : : shared_lock slock { transfer - > state_mutex } ;
auto & file_data = transfer - > file ;
assert ( ! file_data . file_descriptor ) ;
assert ( ! file_data . next_client ) ;
2020-05-13 05:32:08 -04:00
auto absolute_path = transfer - > transfer - > absolute_file_path ;
2020-05-07 15:28:15 -04:00
{
unsigned int open_flags { 0 } ;
if ( transfer - > transfer - > direction = = Transfer : : DIRECTION_DOWNLOAD ) {
open_flags = O_RDONLY ;
std : : error_code fs_error { } ;
2020-05-13 05:32:08 -04:00
if ( absolute_path . empty ( ) | | ! fs : : exists ( absolute_path , fs_error ) ) {
2020-05-07 15:28:15 -04:00
result = FileInitializeResult : : FILE_DOES_NOT_EXISTS ;
goto error_exit ;
} else if ( fs_error ) {
2020-05-13 05:32:08 -04:00
logWarning ( LOG_FT , " {} Failed to check for file existence of {}: {}/{} " , transfer - > log_prefix ( ) , absolute_path , fs_error . value ( ) , fs_error . message ( ) ) ;
2020-05-07 15:28:15 -04:00
result = FileInitializeResult : : FILE_SYSTEM_ERROR ;
goto error_exit ;
}
} else if ( transfer - > transfer - > direction = = Transfer : : DIRECTION_UPLOAD ) {
2020-06-10 12:13:14 -04:00
std : : error_code fs_error { } ;
auto parent_path = fs : : u8path ( absolute_path ) . parent_path ( ) ;
if ( ! fs : : exists ( parent_path ) ) {
if ( ! fs : : create_directories ( parent_path , fs_error ) | | fs_error ) {
logError ( LOG_FT , " {} Failed to create required directories for file upload for {}: {}/{} " , transfer - > log_prefix ( ) , absolute_path , fs_error . value ( ) , fs_error . message ( ) ) ;
result = FileInitializeResult : : COUNT_NOT_CREATE_DIRECTORIES ;
goto error_exit ;
}
} else if ( fs_error ) {
logWarning ( LOG_FT , " {} Failed to check for directory existence of {}: {}/{}. Assuming it exists " , transfer - > log_prefix ( ) , parent_path . string ( ) , fs_error . value ( ) , fs_error . message ( ) ) ;
}
2020-05-07 15:28:15 -04:00
open_flags = ( unsigned ) O_WRONLY | ( unsigned ) O_CREAT ;
if ( transfer - > transfer - > override_exiting )
open_flags | = ( unsigned ) O_TRUNC ;
} else {
return FileInitializeResult : : INVALID_TRANSFER_DIRECTION ;
}
2020-05-13 05:32:08 -04:00
file_data . file_descriptor = open ( absolute_path . c_str ( ) , ( int ) open_flags , 0644 ) ;
2020-05-07 15:28:15 -04:00
if ( file_data . file_descriptor < = 0 ) {
const auto errno_ = errno ;
switch ( errno_ ) {
case EACCES :
result = FileInitializeResult : : FILE_IS_NOT_ACCESSIBLE ;
break ;
case EDQUOT :
2020-05-13 05:32:08 -04:00
logWarning ( LOG_FT , " {} Disk inode limit has been reached. Failed to start file transfer for file {} " , transfer - > log_prefix ( ) , absolute_path ) ;
2020-05-07 15:28:15 -04:00
result = FileInitializeResult : : FILE_SYSTEM_ERROR ;
break ;
case EISDIR :
result = FileInitializeResult : : FILE_IS_A_DIRECTORY ;
break ;
case EMFILE :
result = FileInitializeResult : : PROCESS_FILE_LIMIT_REACHED ;
break ;
case ENFILE :
result = FileInitializeResult : : SYSTEM_FILE_LIMIT_REACHED ;
break ;
case ETXTBSY :
result = FileInitializeResult : : FILE_IS_BUSY ;
break ;
case EROFS :
result = FileInitializeResult : : DISK_IS_READ_ONLY ;
break ;
default :
2020-06-10 12:13:14 -04:00
logWarning ( LOG_FT , " {} Failed to start file {} for file {}: {}/{} " , transfer - > log_prefix ( ) ,
transfer - > transfer - > direction = = Transfer : : DIRECTION_DOWNLOAD ? " download " : " upload " , absolute_path , errno_ , strerror ( errno_ ) ) ;
2020-05-07 15:28:15 -04:00
result = FileInitializeResult : : FILE_SYSTEM_ERROR ;
break ;
}
goto error_exit ;
}
}
2020-09-16 08:15:06 -04:00
this - > file_system_ - > lock_file ( absolute_path ) ;
2020-05-07 15:28:15 -04:00
transfer - > file . file_locked = true ;
if ( transfer - > transfer - > direction = = Transfer : : DIRECTION_UPLOAD ) {
if ( ftruncate ( file_data . file_descriptor , transfer - > transfer - > expected_file_size ) ! = 0 ) {
const auto errno_ = errno ;
switch ( errno_ ) {
case EACCES :
2020-05-13 05:32:08 -04:00
logWarning ( LOG_FT , " {} File {} got inaccessible on truncating, but not on opening. " , transfer - > log_prefix ( ) , absolute_path ) ;
2020-05-07 15:28:15 -04:00
result = FileInitializeResult : : FILE_IS_NOT_ACCESSIBLE ;
goto error_exit ;
case EFBIG :
result = FileInitializeResult : : FILE_TOO_LARGE ;
goto error_exit ;
case EIO :
2020-05-13 05:32:08 -04:00
logWarning ( LOG_FT , " {} A disk IO error occurred while resizing file {}. " , transfer - > log_prefix ( ) , absolute_path ) ;
2020-05-07 15:28:15 -04:00
result = FileInitializeResult : : FILE_IS_NOT_ACCESSIBLE ;
goto error_exit ;
case EROFS :
2020-05-13 05:32:08 -04:00
logWarning ( LOG_FT , " {} Failed to resize file {} because disk is in read only mode. " , transfer - > log_prefix ( ) , absolute_path ) ;
2020-05-07 15:28:15 -04:00
result = FileInitializeResult : : FILE_IS_NOT_ACCESSIBLE ;
goto error_exit ;
default :
2020-05-13 05:32:08 -04:00
debugMessage ( LOG_FT , " {} Failed to truncate file {}: {}/{}. Trying to upload file anyways. " , transfer - > log_prefix ( ) , absolute_path , errno_ , strerror ( errno_ ) ) ;
2020-05-07 15:28:15 -04:00
}
}
} else if ( transfer - > transfer - > direction = = Transfer : : DIRECTION_DOWNLOAD ) {
auto file_size = lseek ( file_data . file_descriptor , 0 , SEEK_END ) ;
if ( file_size ! = transfer - > transfer - > expected_file_size ) {
logWarning ( LOG_FT , " {} Expected target file to be of size {}, but file is actually of size {} " , transfer - > log_prefix ( ) , transfer - > transfer - > expected_file_size , file_size ) ;
result = FileInitializeResult : : FILE_SIZE_MISMATCH ;
goto error_exit ;
}
2020-06-10 12:13:14 -04:00
if ( transfer - > file . query_media_bytes & & file_size > 0 ) {
auto new_pos = lseek ( file_data . file_descriptor , 0 , SEEK_SET ) ;
if ( new_pos < 0 ) {
logWarning ( LOG_FT , " {} Failed to seek to file start: {}/{} " , transfer - > log_prefix ( ) , errno , strerror ( errno ) ) ;
result = FileInitializeResult : : FILE_SEEK_FAILED ;
goto error_exit ;
}
auto read = : : read ( file_data . file_descriptor , transfer - > file . media_bytes , TRANSFER_MEDIA_BYTES_LENGTH ) ;
if ( read < = 0 ) {
logWarning ( LOG_FT , " {} Failed to read file media bytes: {}/{} " , transfer - > log_prefix ( ) , errno , strerror ( errno ) ) ;
result = FileInitializeResult : : FAILED_TO_READ_MEDIA_BYTES ;
goto error_exit ;
}
transfer - > file . media_bytes_length = read ;
}
2020-05-07 15:28:15 -04:00
}
2020-06-10 12:13:14 -04:00
2020-05-07 15:28:15 -04:00
{
auto new_pos = lseek ( file_data . file_descriptor , transfer - > transfer - > file_offset , SEEK_SET ) ;
if ( new_pos < 0 ) {
logWarning ( LOG_FT , " {} Failed to seek to target file offset ({}): {}/{} " , transfer - > log_prefix ( ) , transfer - > transfer - > file_offset , errno , strerror ( errno ) ) ;
result = FileInitializeResult : : FILE_SEEK_FAILED ;
goto error_exit ;
} else if ( new_pos ! = transfer - > transfer - > file_offset ) {
logWarning ( LOG_FT , " {} File rw offset mismatch after seek. Expected {} but received {} " , transfer - > log_prefix ( ) , transfer - > transfer - > file_offset , new_pos ) ;
result = FileInitializeResult : : FILE_SEEK_FAILED ;
goto error_exit ;
}
2020-06-10 12:13:14 -04:00
logTrace ( LOG_FT , " {} Seek to file offset {}. New actual offset is {} " , transfer - > log_prefix ( ) , transfer - > transfer - > file_offset , new_pos ) ;
2020-05-07 15:28:15 -04:00
}
return FileInitializeResult : : SUCCESS ;
error_exit :
if ( std : : exchange ( transfer - > file . file_locked , false ) )
2020-09-16 08:15:06 -04:00
this - > file_system_ - > unlock_file ( absolute_path ) ;
2020-05-07 15:28:15 -04:00
if ( file_data . file_descriptor > 0 )
: : close ( file_data . file_descriptor ) ;
file_data . file_descriptor = 0 ;
return result ;
}
void LocalFileTransfer : : finalize_file_io ( const std : : shared_ptr < FileClient > & transfer ,
std : : unique_lock < std : : shared_mutex > & state_lock ) {
assert ( state_lock . owns_lock ( ) ) ;
2020-09-16 08:15:06 -04:00
if ( ! transfer - > transfer ) {
2020-06-10 12:13:14 -04:00
return ;
2020-09-16 08:15:06 -04:00
}
2020-06-10 12:13:14 -04:00
2020-05-13 05:32:08 -04:00
auto absolute_path = transfer - > transfer - > absolute_file_path ;
2020-05-07 15:28:15 -04:00
auto & file_data = transfer - > file ;
state_lock . unlock ( ) ;
{
std : : unique_lock dlock { this - > disk_io . queue_lock } ;
while ( true ) {
if ( file_data . currently_processing ) {
this - > disk_io . notify_client_processed . wait ( dlock ) ;
continue ;
}
2020-06-10 12:13:14 -04:00
if ( this - > disk_io . queue_head = = & * transfer ) {
this - > disk_io . queue_head = file_data . next_client ;
2020-09-16 08:15:06 -04:00
if ( ! this - > disk_io . queue_head ) {
2020-06-10 12:13:14 -04:00
this - > disk_io . queue_tail = & this - > disk_io . queue_head ;
2020-09-16 08:15:06 -04:00
}
2020-06-10 12:13:14 -04:00
} else if ( file_data . next_client | | this - > disk_io . queue_tail = = & file_data . next_client ) {
FileClient * head { this - > disk_io . queue_head } ;
while ( head - > file . next_client ! = & * transfer ) {
assert ( head - > file . next_client ) ;
head = head - > file . next_client ;
2020-05-07 15:28:15 -04:00
}
2020-06-10 12:13:14 -04:00
head - > file . next_client = file_data . next_client ;
if ( ! file_data . next_client )
this - > disk_io . queue_tail = & head - > file . next_client ;
}
file_data . next_client = nullptr ;
2020-05-07 15:28:15 -04:00
break ;
}
}
state_lock . lock ( ) ;
2020-09-16 08:15:06 -04:00
if ( std : : exchange ( file_data . file_locked , false ) ) {
this - > file_system_ - > unlock_file ( absolute_path ) ;
}
2020-05-07 15:28:15 -04:00
if ( file_data . file_descriptor > 0 )
: : close ( file_data . file_descriptor ) ;
file_data . file_descriptor = 0 ;
}
void LocalFileTransfer : : enqueue_disk_io ( const std : : shared_ptr < FileClient > & client ) {
2020-09-16 08:15:06 -04:00
if ( ! client - > file . file_descriptor ) {
2020-05-07 15:28:15 -04:00
return ;
2020-09-16 08:15:06 -04:00
}
2020-05-07 15:28:15 -04:00
2020-09-16 08:15:06 -04:00
if ( ! client - > transfer ) {
2020-05-07 15:28:15 -04:00
return ;
2020-09-16 08:15:06 -04:00
}
2020-05-07 15:28:15 -04:00
if ( client - > transfer - > direction = = Transfer : : DIRECTION_DOWNLOAD ) {
2020-09-16 08:15:06 -04:00
if ( client - > state ! = FileClient : : STATE_TRANSFERRING ) {
2020-05-07 15:28:15 -04:00
return ;
2020-09-16 08:15:06 -04:00
}
2020-05-07 15:28:15 -04:00
2020-09-16 08:15:06 -04:00
if ( client - > disk_buffer . bytes > TRANSFER_MAX_CACHED_BYTES ) {
2020-05-07 15:28:15 -04:00
return ;
2020-09-16 08:15:06 -04:00
}
2020-05-07 15:28:15 -04:00
} else if ( client - > transfer - > direction = = Transfer : : DIRECTION_UPLOAD ) {
/* we don't do this check because this might be a flush instruction, where the buffer is actually zero bytes filled */
/*
if ( client - > buffer . bytes = = 0 )
return ;
*/
}
std : : lock_guard dlock { this - > disk_io . queue_lock } ;
2020-09-16 08:15:06 -04:00
if ( client - > file . next_client | | this - > disk_io . queue_tail = = & client - > file . next_client ) {
2020-05-07 15:28:15 -04:00
return ;
2020-09-16 08:15:06 -04:00
}
2020-05-07 15:28:15 -04:00
* this - > disk_io . queue_tail = & * client ;
this - > disk_io . queue_tail = & client - > file . next_client ;
this - > disk_io . notify_work_awaiting . notify_all ( ) ;
}
void LocalFileTransfer : : execute_disk_io ( const std : : shared_ptr < FileClient > & client ) {
2021-01-03 11:16:23 -05:00
if ( ! client - > transfer ) {
return ;
}
2020-05-07 15:28:15 -04:00
if ( client - > transfer - > direction = = Transfer : : DIRECTION_UPLOAD ) {
2020-09-16 08:15:06 -04:00
Buffer * buffer ;
size_t buffer_left_size ;
2020-05-07 15:28:15 -04:00
while ( true ) {
{
2020-06-10 12:13:14 -04:00
std : : lock_guard block { client - > disk_buffer . mutex } ;
2021-01-03 11:16:23 -05:00
if ( ! client - > disk_buffer . buffer_head ) {
assert ( client - > disk_buffer . bytes = = 0 ) ;
buffer_left_size = 0 ;
break ;
}
2020-06-10 12:13:14 -04:00
buffer_left_size = client - > disk_buffer . bytes ;
2021-01-03 11:16:23 -05:00
buffer = ref_buffer ( client - > disk_buffer . buffer_head ) ;
2020-05-07 15:28:15 -04:00
}
assert ( buffer - > offset < buffer - > length ) ;
auto written = : : write ( client - > file . file_descriptor , buffer - > data + buffer - > offset , buffer - > length - buffer - > offset ) ;
if ( written < = 0 ) {
2021-01-03 11:16:23 -05:00
deref_buffer ( buffer ) ;
if ( errno = = EAGAIN ) {
//TODO: Timeout?
this - > enqueue_disk_io ( client ) ;
break ;
}
2020-05-07 15:28:15 -04:00
if ( written = = 0 ) {
/* EOF, how the hell is this event possible?! */
2020-05-13 05:32:08 -04:00
auto offset_written = client - > statistics . disk_bytes_write . total_bytes + client - > transfer - > file_offset ;
2020-05-07 15:28:15 -04:00
auto aoffset = lseek ( client - > file . file_descriptor , 0 , SEEK_CUR ) ;
logError ( LOG_FT , " {} Received unexpected file write EOF. EOF received at {} but expected {}. Actual file offset: {}. Closing transfer. " ,
client - > log_prefix ( ) , offset_written , client - > transfer - > expected_file_size , aoffset ) ;
2020-05-13 12:03:14 -04:00
this - > invoke_aborted_callback ( client , { TransferError : : UNEXPECTED_DISK_EOF , strerror ( errno ) } ) ;
2020-06-11 07:08:45 -04:00
client - > flush_disk_buffer ( ) ;
2020-05-07 15:28:15 -04:00
{
std : : unique_lock slock { client - > state_mutex } ;
client - > handle - > disconnect_client ( client , slock , true ) ;
}
} else {
2020-05-13 05:32:08 -04:00
auto offset_written = client - > statistics . disk_bytes_write . total_bytes + client - > transfer - > file_offset ;
2020-05-07 15:28:15 -04:00
auto aoffset = lseek ( client - > file . file_descriptor , 0 , SEEK_CUR ) ;
logError ( LOG_FT , " {} Received write to disk IO error. Write pointer is at {} of {}. Actual file offset: {}. Closing transfer. " ,
client - > log_prefix ( ) , offset_written , client - > transfer - > expected_file_size , aoffset ) ;
2020-05-13 12:03:14 -04:00
this - > invoke_aborted_callback ( client , { TransferError : : DISK_IO_ERROR , strerror ( errno ) } ) ;
2020-06-11 07:08:45 -04:00
client - > flush_disk_buffer ( ) ;
2020-05-07 15:28:15 -04:00
{
std : : unique_lock slock { client - > state_mutex } ;
client - > handle - > disconnect_client ( client , slock , true ) ;
}
}
return ;
} else {
buffer - > offset + = written ;
assert ( buffer - > offset < = buffer - > length ) ;
2021-01-03 11:16:23 -05:00
2020-05-07 15:28:15 -04:00
if ( buffer - > length = = buffer - > offset ) {
{
2020-06-10 12:13:14 -04:00
std : : lock_guard block { client - > disk_buffer . mutex } ;
2021-01-03 11:16:23 -05:00
if ( client - > disk_buffer . buffer_head = = buffer ) {
client - > disk_buffer . buffer_head = buffer - > next ;
if ( ! buffer - > next ) {
client - > disk_buffer . buffer_tail = & client - > disk_buffer . buffer_head ;
}
assert ( client - > disk_buffer . bytes > = written ) ;
client - > disk_buffer . bytes - = written ;
buffer_left_size = client - > disk_buffer . bytes ;
2021-01-04 13:45:48 -05:00
/* We have to deref the buffer twice since we've removed it from the list which owns us one reference */
/* Will not trigger a memory free since we're still holding onto one reference */
deref_buffer ( buffer ) ;
2021-01-03 11:16:23 -05:00
} else {
/* The buffer got removed */
}
}
} else {
std : : lock_guard block { client - > disk_buffer . mutex } ;
if ( client - > disk_buffer . buffer_head = = buffer ) {
2020-06-10 12:13:14 -04:00
assert ( client - > disk_buffer . bytes > = written ) ;
client - > disk_buffer . bytes - = written ;
buffer_left_size = client - > disk_buffer . bytes ;
2021-01-03 11:16:23 -05:00
} else {
/* The buffer got removed */
2020-05-07 15:28:15 -04:00
}
}
2021-01-03 11:16:23 -05:00
deref_buffer ( buffer ) ;
2020-05-13 05:32:08 -04:00
client - > statistics . disk_bytes_write . increase_bytes ( written ) ;
2020-05-07 15:28:15 -04:00
}
}
2020-06-10 12:13:14 -04:00
if ( buffer_left_size > 0 ) {
2020-05-07 15:28:15 -04:00
this - > enqueue_disk_io ( client ) ;
2020-08-01 05:34:24 -04:00
} else if ( client - > state = = FileClient : : STATE_FLUSHING ) {
2020-06-11 07:08:45 -04:00
this - > test_disconnecting_state ( client ) ;
2020-05-07 15:28:15 -04:00
}
if ( client - > state = = FileClient : : STATE_TRANSFERRING & & buffer_left_size < TRANSFER_MAX_CACHED_BYTES / 2 ) {
2021-01-03 11:16:23 -05:00
if ( client - > disk_buffer . buffering_stopped ) {
2020-05-07 15:28:15 -04:00
logMessage ( LOG_FT , " {} Starting network read, buffer is capable for reading again. " , client - > log_prefix ( ) ) ;
2021-01-03 11:16:23 -05:00
}
2020-05-07 15:28:15 -04:00
client - > add_network_read_event ( false ) ;
}
} else if ( client - > transfer - > direction = = Transfer : : DIRECTION_DOWNLOAD ) {
2020-08-01 05:34:24 -04:00
if ( client - > state = = FileClient : : STATE_FLUSHING ) {
2020-07-13 05:13:09 -04:00
client - > flush_disk_buffer ( ) ; /* just in case, file download usually don't write to the disk */
2020-06-11 07:08:45 -04:00
return ;
}
2020-05-10 10:23:02 -04:00
2020-05-07 15:28:15 -04:00
while ( true ) {
constexpr auto buffer_capacity { 4096 } ;
char buffer [ buffer_capacity ] ;
auto read = : : read ( client - > file . file_descriptor , buffer , buffer_capacity ) ;
if ( read < = 0 ) {
2021-01-03 11:16:23 -05:00
if ( errno = = EAGAIN ) {
this - > enqueue_disk_io ( client ) ;
return ;
}
2020-05-07 15:28:15 -04:00
if ( read = = 0 ) {
/* EOF */
2020-05-13 05:32:08 -04:00
auto offset_send = client - > statistics . disk_bytes_read . total_bytes + client - > transfer - > file_offset ;
2020-05-07 15:28:15 -04:00
if ( client - > transfer - > expected_file_size = = offset_send ) {
debugMessage ( LOG_FT , " {} Finished file reading. Flushing and disconnecting transfer. Reading took {} seconds. " ,
client - > log_prefix ( ) , duration_to_string ( std : : chrono : : system_clock : : now ( ) - client - > timings . key_received ) ) ;
} else {
auto aoffset = lseek ( client - > file . file_descriptor , 0 , SEEK_CUR ) ;
logError ( LOG_FT , " {} Received unexpected read EOF. EOF received at {} but expected {}. Actual file offset: {}. Disconnecting client. " ,
client - > log_prefix ( ) , offset_send , client - > transfer - > expected_file_size , aoffset ) ;
2020-05-13 12:03:14 -04:00
this - > invoke_aborted_callback ( client , { TransferError : : UNEXPECTED_DISK_EOF , " " } ) ;
2020-05-07 15:28:15 -04:00
}
} else {
2020-05-13 05:32:08 -04:00
logWarning ( LOG_FT , " {} Failed to read from file {} ({}/{}). Aborting transfer. " , client - > log_prefix ( ) , client - > transfer - > absolute_file_path , errno , strerror ( errno ) ) ;
2020-05-07 15:28:15 -04:00
2020-05-13 12:03:14 -04:00
this - > invoke_aborted_callback ( client , { TransferError : : DISK_IO_ERROR , strerror ( errno ) } ) ;
2020-05-07 15:28:15 -04:00
}
2020-06-11 07:08:45 -04:00
std : : unique_lock slock { client - > state_mutex } ;
client - > handle - > disconnect_client ( client , slock , true ) ;
2020-05-07 15:28:15 -04:00
return ;
} else {
auto buffer_full = client - > send_file_bytes ( buffer , read ) ;
2020-05-13 05:32:08 -04:00
client - > statistics . disk_bytes_read . increase_bytes ( read ) ;
client - > statistics . file_transferred . increase_bytes ( read ) ;
2020-05-07 15:28:15 -04:00
std : : shared_lock slock { client - > state_mutex } ;
if ( buffer_full ) {
2020-05-13 05:32:08 -04:00
//logTrace(LOG_FT, "{} Stopping buffering from disk. Buffer full ({}bytes)", client->log_prefix(), client->buffer.bytes);
2020-05-07 15:28:15 -04:00
break ;
}
/* we've stuff to write again, yeahr */
client - > add_network_write_event ( false ) ;
}
}
} else {
logError ( LOG_FT , " {} Disk IO scheduled, but transfer direction is unknown. " , client - > log_prefix ( ) ) ;
}
}