diff --git a/file/local_server/LocalFileProvider.cpp b/file/local_server/LocalFileProvider.cpp index 12202bc..5ba1b40 100644 --- a/file/local_server/LocalFileProvider.cpp +++ b/file/local_server/LocalFileProvider.cpp @@ -5,6 +5,8 @@ #include #include #include "LocalFileProvider.h" +#include "LocalFileSystem.h" +#include "LocalFileTransfer.h" using namespace ts::server; using LocalFileServer = file::LocalFileProvider; @@ -72,36 +74,42 @@ std::shared_ptr file::server() { return server_instance; } -LocalFileServer::LocalFileProvider() : file_system_{}, file_transfer_{this->file_system_} {} -LocalFileServer::~LocalFileProvider() = default; +LocalFileServer::LocalFileProvider() { + this->file_system_ = new filesystem::LocalFileSystem(); + this->file_transfer_ = new transfer::LocalFileTransfer(this->file_system_); +} +LocalFileServer::~LocalFileProvider() { + delete this->file_transfer_; + delete this->file_system_; +}; bool LocalFileServer::initialize(std::string &error) { - if(!this->file_system_.initialize(error, "files/")) + if(!this->file_system_->initialize(error, "files/")) return false; - if(!this->file_transfer_.start()) { + if(!this->file_transfer_->start()) { error = "transfer server startup failed"; - this->file_system_.finalize(); + this->file_system_->finalize(); return false; } return true; } void LocalFileServer::finalize() { - this->file_transfer_.stop(); - this->file_system_.finalize(); + this->file_transfer_->stop(); + this->file_system_->finalize(); } file::filesystem::AbstractProvider &LocalFileServer::file_system() { - return this->file_system_; + return *this->file_system_; } file::transfer::AbstractProvider &LocalFileServer::file_transfer() { - return this->file_transfer_; + return *this->file_transfer_; } std::string file::LocalFileProvider::file_base_path() const { - return this->file_system_.root_path(); + return this->file_system_->root_path(); } std::shared_ptr LocalFileServer::register_server(ServerId server_id) { diff --git a/file/local_server/LocalFileProvider.h b/file/local_server/LocalFileProvider.h index bafa002..05db4a6 100644 --- a/file/local_server/LocalFileProvider.h +++ b/file/local_server/LocalFileProvider.h @@ -14,526 +14,9 @@ #include #include "./NetTools.h" -#define TRANSFER_MAX_CACHED_BYTES (1024 * 1024 * 1) // Buffer up to 1mb - namespace ts::server::file { - namespace filesystem { -#ifdef FS_INCLUDED - namespace fs = std::experimental::filesystem; -#endif - - class LocalFileSystem : public filesystem::AbstractProvider { - using FileModifyError = filesystem::FileModifyError; - using DirectoryModifyError = filesystem::DirectoryModifyError; - public: - enum struct FileCategory { - ICON, - AVATAR, - CHANNEL - }; - - virtual ~LocalFileSystem(); - - bool initialize(std::string & /* error */, const std::string & /* root path */); - void finalize(); - - void lock_file(const std::string& /* absolute path */); - void unlock_file(const std::string& /* absolute path */); - - [[nodiscard]] inline const auto &root_path() const { return this->root_path_; } - - [[nodiscard]] std::string absolute_avatar_path(const std::shared_ptr &, const std::string&); - [[nodiscard]] std::string absolute_icon_path(const std::shared_ptr &, const std::string&); - [[nodiscard]] std::string absolute_channel_path(const std::shared_ptr &, ChannelId, const std::string&); - - std::shared_ptr> initialize_server(const std::shared_ptr & /* server */) override; - std::shared_ptr> delete_server(const std::shared_ptr & /* server */) override; - - std::shared_ptr> - query_channel_info(const std::shared_ptr & /* server */, const std::vector>& /* names */) override; - - std::shared_ptr - query_channel_directory(const std::shared_ptr & id, ChannelId channelId, const std::string &string) override; - - std::shared_ptr> - create_channel_directory(const std::shared_ptr & id, ChannelId channelId, const std::string &string) override; - - std::shared_ptr> - delete_channel_files(const std::shared_ptr & id, ChannelId channelId, const std::vector &string) override; - - std::shared_ptr> - rename_channel_file(const std::shared_ptr & id, ChannelId channelId, const std::string &, ChannelId, const std::string &) override; - - std::shared_ptr> - query_icon_info(const std::shared_ptr & /* server */, const std::vector& /* names */) override; - - std::shared_ptr query_icon_directory(const std::shared_ptr & id) override; - - std::shared_ptr> - delete_icons(const std::shared_ptr & id, const std::vector &string) override; - - std::shared_ptr> - query_avatar_info(const std::shared_ptr & /* server */, const std::vector& /* names */) override; - - std::shared_ptr query_avatar_directory(const std::shared_ptr & id) override; - - std::shared_ptr> - delete_avatars(const std::shared_ptr & id, const std::vector &string) override; - - private: -#ifdef FS_INCLUDED - [[nodiscard]] fs::path server_path(const std::shared_ptr &); - [[nodiscard]] fs::path server_channel_path(const std::shared_ptr &, ChannelId); - [[nodiscard]] static bool exceeds_base_path(const fs::path& /* base */, const fs::path& /* target */); - [[nodiscard]] bool is_any_file_locked(const fs::path& /* base */, const std::string& /* path */, std::string& /* file (relative to the base) */); - - [[nodiscard]] std::shared_ptr> - delete_files(const fs::path& /* base */, const std::vector &string); - - [[nodiscard]] std::shared_ptr - query_directory(const fs::path& /* base */, const std::string &string, bool); - - [[nodiscard]] std::shared_ptr> - query_file_info(const std::vector> &string); -#endif - - template - std::shared_ptr> create_execute_response() { - return std::make_shared>(this->result_notify_mutex, this->result_notify_cv); - } - std::string target_file_path(FileCategory type, const std::shared_ptr &sid, ts::ChannelId cid, const std::string &path); - - std::mutex result_notify_mutex{}; - std::condition_variable result_notify_cv{}; - - std::string root_path_{}; - - std::mutex locked_files_mutex{}; - std::deque locked_files_{}; - }; - } - - namespace transfer { - class LocalFileTransfer; - - struct Buffer { - Buffer* next{nullptr}; - - size_t capacity{0}; - size_t length{0}; - size_t offset{0}; - - char data[1]{}; - }; - [[nodiscard]] extern Buffer* allocate_buffer(size_t); - extern void free_buffer(Buffer*); - - /* all variables are locked via the state_mutex */ - struct FileClient : std::enable_shared_from_this { - LocalFileTransfer* handle; - std::shared_ptr transfer{nullptr}; - - std::shared_mutex state_mutex{}; - enum { - STATE_AWAITING_KEY, /* includes SSL/HTTP init */ - STATE_TRANSFERRING, - STATE_FLUSHING, - STATE_DISCONNECTED - } state{STATE_AWAITING_KEY}; - - bool finished_signal_send{false}; - - enum NetworkingProtocol { - PROTOCOL_UNKNOWN, - PROTOCOL_HTTPS, - PROTOCOL_TS_V1 - }; - - enum HTTPUploadState { - HTTP_AWAITING_HEADER, - HTTP_STATE_AWAITING_BOUNDARY, - HTTP_STATE_AWAITING_BOUNDARY_END, - HTTP_STATE_UPLOADING, - HTTP_STATE_DOWNLOADING - }; - - struct { - bool file_locked{false}; - int file_descriptor{0}; - - bool currently_processing{false}; - FileClient* next_client{nullptr}; - - bool query_media_bytes{false}; - uint8_t media_bytes[TRANSFER_MEDIA_BYTES_LENGTH]{0}; - uint8_t media_bytes_length{0}; - } file{}; - - struct { - size_t provided_bytes{0}; - char key[TRANSFER_KEY_LENGTH]{0}; - } transfer_key{}; - - struct { - std::mutex mutex{}; - size_t bytes{0}; - - bool buffering_stopped{false}; - bool write_disconnected{false}; - - Buffer* buffer_head{nullptr}; - Buffer** buffer_tail{&buffer_head}; - } network_buffer{}; - - struct { - std::mutex mutex{}; - size_t bytes{0}; - - bool buffering_stopped{false}; - bool write_disconnected{false}; - - Buffer* buffer_head{nullptr}; - Buffer** buffer_tail{&buffer_head}; - } disk_buffer{}; - - struct { - sockaddr_storage address{}; - int file_descriptor{0}; - - NetworkingProtocol protocol{PROTOCOL_UNKNOWN}; - - struct event* event_read{nullptr}; - struct event* event_write{nullptr}; - struct event* event_throttle{nullptr}; - - bool add_event_write{false}, add_event_read{false}; - - std::chrono::system_clock::time_point disconnect_timeout{}; - - networking::NetworkThrottle client_throttle{}; - /* the right side is the server throttle */ - networking::DualNetworkThrottle throttle{&client_throttle, &networking::NetworkThrottle::kNoThrottle}; - - pipes::SSL pipe_ssl{}; - bool pipe_ssl_init{false}; - std::unique_ptr http_header_buffer{nullptr, free_buffer}; - HTTPUploadState http_state{HTTPUploadState::HTTP_AWAITING_HEADER}; - - std::string http_boundary{}; - - /* Only read the transfer key length at the beginning. We than have the actual limit which will be set via throttle */ - size_t max_read_buffer_size{TRANSFER_KEY_LENGTH}; - } networking{}; - - struct { - networking::TransferStatistics network_send{}; - networking::TransferStatistics network_received{}; - - networking::TransferStatistics file_transferred{}; - - networking::TransferStatistics disk_bytes_read{}; - networking::TransferStatistics disk_bytes_write{}; - } statistics{}; - - struct { - std::chrono::system_clock::time_point last_write{}; - std::chrono::system_clock::time_point last_read{}; - - std::chrono::system_clock::time_point connected{}; - std::chrono::system_clock::time_point key_received{}; - std::chrono::system_clock::time_point disconnecting{}; - } timings; - - explicit FileClient(LocalFileTransfer* handle) : handle{handle} { memtrack::allocated(this); } - ~FileClient(); - - void add_network_write_event(bool /* ignore bandwidth limits */); - void add_network_write_event_nolock(bool /* ignore bandwidth limits */); - - /* will check if we've enough space in out read buffer again */ - void add_network_read_event(bool /* ignore bandwidth limits */); - - bool send_file_bytes(const void* /* buffer */, size_t /* length */); - bool enqueue_network_buffer_bytes(const void* /* buffer */, size_t /* length */); - bool enqueue_disk_buffer_bytes(const void* /* buffer */, size_t /* length */); - - /* these function clear the buffers and set the write disconnected flags to true so no new buffers will be enqueued */ - size_t flush_network_buffer(); - void flush_disk_buffer(); - - [[nodiscard]] bool buffers_flushed(); - [[nodiscard]] inline std::string log_prefix() const { return "[" + net::to_string(this->networking.address) + "]"; } - }; - - enum struct DiskIOStartResult { - SUCCESS, - OUT_OF_MEMORY - }; - - enum struct NetworkingStartResult { - SUCCESS, - OUT_OF_MEMORY - }; - - enum struct NetworkingBindResult { - SUCCESS, - - BINDING_ALREADY_EXISTS, - NETWORKING_NOT_INITIALIZED, - FAILED_TO_ALLOCATE_SOCKET, /* errno is set */ - FAILED_TO_BIND, - FAILED_TO_LISTEN, - - OUT_OF_MEMORY, - }; - - enum struct NetworkingUnbindResult { - SUCCESS, - UNKNOWN_BINDING - }; - - enum struct ClientWorkerStartResult { - SUCCESS - }; - - enum struct NetworkInitializeResult { - SUCCESS, - OUT_OF_MEMORY - }; - - enum struct FileInitializeResult { - SUCCESS, - - INVALID_TRANSFER_DIRECTION, - OUT_OF_MEMORY, - - PROCESS_FILE_LIMIT_REACHED, - SYSTEM_FILE_LIMIT_REACHED, - - FILE_IS_BUSY, - FILE_DOES_NOT_EXISTS, - FILE_SYSTEM_ERROR, - FILE_IS_A_DIRECTORY, - - FILE_TOO_LARGE, - DISK_IS_READ_ONLY, - - FILE_SEEK_FAILED, - FILE_SIZE_MISMATCH, - - FILE_IS_NOT_ACCESSIBLE, - - FAILED_TO_READ_MEDIA_BYTES, - COUNT_NOT_CREATE_DIRECTORIES, - - MAX - }; - - constexpr static std::array kFileInitializeResultMessages{ - /* SUCCESS */ "success", - - /* INVALID_TRANSFER_DIRECTION */ "invalid file transfer direction", - /* OUT_OF_MEMORY */ "out of memory", - - /* PROCESS_FILE_LIMIT_REACHED */ "process file limit reached", - /* SYSTEM_FILE_LIMIT_REACHED */ "system file limit reached", - - /* FILE_IS_BUSY */ "target file is busy", - /* FILE_DOES_NOT_EXISTS */ "target file does not exists", - /* FILE_SYSTEM_ERROR */ "internal file system error", - /* FILE_IS_A_DIRECTORY */ "target file is a directory", - - /* FILE_TOO_LARGE */ "file is too large", - /* DISK_IS_READ_ONLY */ "disk is in read only mode", - - /* FILE_SEEK_FAILED */ "failed to seek to target file offset", - /* FILE_SIZE_MISMATCH */ "file size miss match", - /* FILE_IS_NOT_ACCESSIBLE */ "file is not accessible", - /* FAILED_TO_READ_MEDIA_BYTES */ "failed to read file media bytes", - /* COUNT_NOT_CREATE_DIRECTORIES */ "could not create required directories" - }; - - enum struct TransferKeyApplyResult { - SUCCESS, - FILE_ERROR, - UNKNOWN_KEY, - - INTERNAL_ERROR - }; - - enum struct TransferUploadRawResult { - MORE_DATA_TO_RECEIVE, - FINISH, - FINISH_OVERFLOW, - - /* UNKNOWN ERROR */ - }; - - enum struct TransferUploadHTTPResult { - MORE_DATA_TO_RECEIVE, - FINISH, - - BOUNDARY_MISSING, - BOUNDARY_TOKEN_INVALID, - BOUNDARY_INVALID, - - MISSING_CONTENT_TYPE, - INVALID_CONTENT_TYPE - /* UNKNOWN ERROR */ - }; - - struct NetworkBinding { - std::string hostname{}; - sockaddr_storage address{}; - }; - - struct ActiveNetworkBinding : std::enable_shared_from_this { - std::string hostname{}; - sockaddr_storage address{}; - - int file_descriptor{-1}; - struct event* accept_event{nullptr}; - - LocalFileTransfer* handle{nullptr}; - }; - - class LocalFileTransfer : public AbstractProvider { - public: - explicit LocalFileTransfer(filesystem::LocalFileSystem&); - ~LocalFileTransfer(); - - [[nodiscard]] bool start(); - void stop(); - - [[nodiscard]] NetworkingBindResult add_network_binding(const NetworkBinding& /* binding */); - [[nodiscard]] std::vector active_network_bindings(); - [[nodiscard]] NetworkingUnbindResult remove_network_binding(const NetworkBinding& /* binding */); - - std::shared_ptr>> - initialize_channel_transfer(Transfer::Direction direction, const std::shared_ptr& server, ChannelId channelId, - const TransferInfo &info) override; - - std::shared_ptr>> - initialize_icon_transfer(Transfer::Direction direction, const std::shared_ptr& server, const TransferInfo &info) override; - - std::shared_ptr>> - initialize_avatar_transfer(Transfer::Direction direction, const std::shared_ptr& server, const TransferInfo &info) override; - - std::shared_ptr>> list_transfer() override; - - std::shared_ptr> stop_transfer(const std::shared_ptr& /* server */, transfer_id id, bool) override; - private: - enum struct DiskIOLoopState { - STOPPED, - RUNNING, - - STOPPING, - FORCE_STOPPING - }; - filesystem::LocalFileSystem& file_system_; - - size_t max_concurrent_transfers{1024}; - std::mt19937 transfer_random_token_generator{std::random_device{}()}; - - std::mutex result_notify_mutex{}; - std::condition_variable result_notify_cv{}; - - std::mutex transfers_mutex{}; - std::mutex transfer_create_mutex{}; - std::deque> transfers_{}; - std::deque> pending_transfers{}; - - enum ServerState { - STOPPED, - RUNNING - } state{ServerState::STOPPED}; - - struct { - bool active{false}; - - std::thread dispatch_thread{}; - std::mutex mutex{}; - std::condition_variable notify_cv{}; - } disconnect; - - struct { - std::mutex mutex; - - bool active{false}; - std::thread dispatch_thread{}; - struct event_base* event_base{nullptr}; - - std::deque> bindings{}; - } network{}; - - struct { - DiskIOLoopState state{DiskIOLoopState::STOPPED}; - std::thread dispatch_thread{}; - std::mutex queue_lock{}; - std::condition_variable notify_work_awaiting{}; - std::condition_variable notify_client_processed{}; - - FileClient* queue_head{nullptr}; - FileClient** queue_tail{&queue_head}; - } disk_io{}; - - template - std::shared_ptr> create_execute_response() { - return std::make_shared>(this->result_notify_mutex, this->result_notify_cv); - } - - std::shared_ptr>> - initialize_transfer(Transfer::Direction, const std::shared_ptr &, ChannelId, Transfer::TargetType, const TransferInfo &info); - - [[nodiscard]] NetworkingStartResult start_networking(); - [[nodiscard]] DiskIOStartResult start_disk_io(); - [[nodiscard]] ClientWorkerStartResult start_client_worker(); - - void shutdown_networking(); - void shutdown_disk_io(); - void shutdown_client_worker(); - - void disconnect_client(const std::shared_ptr& /* client */, std::unique_lock& /* state lock */, bool /* flush network */); - - [[nodiscard]] NetworkInitializeResult initialize_networking(const std::shared_ptr& /* client */, int /* file descriptor */); - /* might block 'till all IO operations have been succeeded */ - void finalize_networking(const std::shared_ptr& /* client */, std::unique_lock& /* state lock */); - - [[nodiscard]] FileInitializeResult initialize_file_io(const std::shared_ptr& /* client */); - void finalize_file_io(const std::shared_ptr& /* client */, std::unique_lock& /* state lock */); - - [[nodiscard]] bool initialize_client_ssl(const std::shared_ptr& /* client */); - void finalize_client_ssl(const std::shared_ptr& /* client */); - - void enqueue_disk_io(const std::shared_ptr& /* client */); - void execute_disk_io(const std::shared_ptr& /* client */); - - void test_disconnecting_state(const std::shared_ptr& /* client */); - - [[nodiscard]] TransferUploadRawResult handle_transfer_upload_raw(const std::shared_ptr& /* client */, const char * /* buffer */, size_t /* length */, size_t* /* bytes written */); - [[nodiscard]] TransferUploadHTTPResult handle_transfer_upload_http(const std::shared_ptr& /* client */, const char * /* buffer */, size_t /* length */); - - void send_http_response(const std::shared_ptr& /* client */, http::HttpResponse& /* response */); - - static void callback_transfer_network_write(int, short, void*); - static void callback_transfer_network_read(int, short, void*); - static void callback_transfer_network_throttle(int, short, void*); - static void callback_transfer_network_accept(int, short, void*); - - static void dispatch_loop_client_worker(void*); - static void dispatch_loop_network(void*); - static void dispatch_loop_disk_io(void*); - - void report_transfer_statistics(const std::shared_ptr& /* client */); - [[nodiscard]] TransferStatistics generate_transfer_statistics_report(const std::shared_ptr& /* client */); - void invoke_aborted_callback(const std::shared_ptr& /* client */, const TransferError& /* error */); - void invoke_aborted_callback(const std::shared_ptr& /* pending transfer */, const TransferError& /* error */); - - size_t handle_transfer_read(const std::shared_ptr& /* client */, const char* /* buffer */, size_t /* bytes */); - size_t handle_transfer_read_raw(const std::shared_ptr& /* client */, const char* /* buffer */, size_t /* bytes */); - [[nodiscard]] TransferKeyApplyResult handle_transfer_key_provided(const std::shared_ptr& /* client */, std::string& /* error */); - }; - } + namespace filesystem { class LocalFileSystem; } + namespace transfer { class LocalFileTransfer; } class LocalVirtualFileServer : public VirtualFileServer { public: @@ -563,7 +46,7 @@ namespace ts::server::file { std::shared_ptr register_server(ServerId /* server id */) override; void unregister_server(ServerId /* server id */) override; private: - filesystem::LocalFileSystem file_system_; - transfer::LocalFileTransfer file_transfer_; + filesystem::LocalFileSystem* file_system_; + transfer::LocalFileTransfer* file_transfer_; }; } \ No newline at end of file diff --git a/file/local_server/LocalFileSystem.cpp b/file/local_server/LocalFileSystem.cpp index 3b682f3..fb1113a 100644 --- a/file/local_server/LocalFileSystem.cpp +++ b/file/local_server/LocalFileSystem.cpp @@ -5,8 +5,8 @@ #define FS_INCLUDED #include -#include "LocalFileProvider.h" -#include "clnpath.h" +#include "./LocalFileSystem.h" +#include "./clnpath.h" using namespace ts::server::file; using namespace ts::server::file::filesystem; diff --git a/file/local_server/LocalFileSystem.h b/file/local_server/LocalFileSystem.h new file mode 100644 index 0000000..9fbe11b --- /dev/null +++ b/file/local_server/LocalFileSystem.h @@ -0,0 +1,117 @@ +// +// Created by WolverinDEV on 16/09/2020. +// + +#pragma once + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "./NetTools.h" + +namespace ts::server::file::filesystem { +#ifdef FS_INCLUDED + namespace fs = std::experimental::filesystem; +#endif + + class LocalFileSystem : public filesystem::AbstractProvider { + using FileModifyError = filesystem::FileModifyError; + using DirectoryModifyError = filesystem::DirectoryModifyError; + public: + enum struct FileCategory { + ICON, + AVATAR, + CHANNEL + }; + + virtual ~LocalFileSystem(); + + bool initialize(std::string & /* error */, const std::string & /* root path */); + void finalize(); + + void lock_file(const std::string& /* absolute path */); + void unlock_file(const std::string& /* absolute path */); + + [[nodiscard]] inline const auto &root_path() const { return this->root_path_; } + + [[nodiscard]] std::string absolute_avatar_path(const std::shared_ptr &, const std::string&); + [[nodiscard]] std::string absolute_icon_path(const std::shared_ptr &, const std::string&); + [[nodiscard]] std::string absolute_channel_path(const std::shared_ptr &, ChannelId, const std::string&); + + std::shared_ptr> initialize_server(const std::shared_ptr & /* server */) override; + std::shared_ptr> delete_server(const std::shared_ptr & /* server */) override; + + std::shared_ptr> + query_channel_info(const std::shared_ptr & /* server */, const std::vector>& /* names */) override; + + std::shared_ptr + query_channel_directory(const std::shared_ptr & id, ChannelId channelId, const std::string &string) override; + + std::shared_ptr> + create_channel_directory(const std::shared_ptr & id, ChannelId channelId, const std::string &string) override; + + std::shared_ptr> + delete_channel_files(const std::shared_ptr & id, ChannelId channelId, const std::vector &string) override; + + std::shared_ptr> + rename_channel_file(const std::shared_ptr & id, ChannelId channelId, const std::string &, ChannelId, const std::string &) override; + + std::shared_ptr> + query_icon_info(const std::shared_ptr & /* server */, const std::vector& /* names */) override; + + std::shared_ptr query_icon_directory(const std::shared_ptr & id) override; + + std::shared_ptr> + delete_icons(const std::shared_ptr & id, const std::vector &string) override; + + std::shared_ptr> + query_avatar_info(const std::shared_ptr & /* server */, const std::vector& /* names */) override; + + std::shared_ptr query_avatar_directory(const std::shared_ptr & id) override; + + std::shared_ptr> + delete_avatars(const std::shared_ptr & id, const std::vector &string) override; + + private: +#ifdef FS_INCLUDED + [[nodiscard]] fs::path server_path(const std::shared_ptr &); + [[nodiscard]] fs::path server_channel_path(const std::shared_ptr &, ChannelId); + [[nodiscard]] static bool exceeds_base_path(const fs::path& /* base */, const fs::path& /* target */); + [[nodiscard]] bool is_any_file_locked(const fs::path& /* base */, const std::string& /* path */, std::string& /* file (relative to the base) */); + + [[nodiscard]] std::shared_ptr> + delete_files(const fs::path& /* base */, const std::vector &string); + + [[nodiscard]] std::shared_ptr + query_directory(const fs::path& /* base */, const std::string &string, bool); + + [[nodiscard]] std::shared_ptr> + query_file_info(const std::vector> &string); +#endif + + template + std::shared_ptr> create_execute_response() { + return std::make_shared>(this->result_notify_mutex, this->result_notify_cv); + } + std::string target_file_path(FileCategory type, const std::shared_ptr &sid, ts::ChannelId cid, const std::string &path); + + std::mutex result_notify_mutex{}; + std::condition_variable result_notify_cv{}; + + std::string root_path_{}; + + std::mutex locked_files_mutex{}; + std::deque locked_files_{}; + }; +} \ No newline at end of file diff --git a/file/local_server/LocalFileTransfer.cpp b/file/local_server/LocalFileTransfer.cpp index 2fc5b47..ff147b7 100644 --- a/file/local_server/LocalFileTransfer.cpp +++ b/file/local_server/LocalFileTransfer.cpp @@ -7,6 +7,7 @@ #include #include #include "./LocalFileProvider.h" +#include "./LocalFileTransfer.h" #include namespace fs = std::experimental::filesystem; @@ -44,7 +45,7 @@ FileClient::~FileClient() { memtrack::freed(this); } -LocalFileTransfer::LocalFileTransfer(filesystem::LocalFileSystem &fs) : file_system_{fs} {} +LocalFileTransfer::LocalFileTransfer(filesystem::LocalFileSystem *fs) : file_system_{fs} {} LocalFileTransfer::~LocalFileTransfer() = default; bool LocalFileTransfer::start() { @@ -184,13 +185,13 @@ std::shared_ptr>> L std::string absolute_path{}; switch (transfer->target_type) { case Transfer::TARGET_TYPE_AVATAR: - absolute_path = this->file_system_.absolute_avatar_path(transfer->server, transfer->target_file_path); + absolute_path = this->file_system_->absolute_avatar_path(transfer->server, transfer->target_file_path); break; case Transfer::TARGET_TYPE_ICON: - absolute_path = this->file_system_.absolute_icon_path(transfer->server, transfer->target_file_path); + absolute_path = this->file_system_->absolute_icon_path(transfer->server, transfer->target_file_path); break; case Transfer::TARGET_TYPE_CHANNEL_FILE: - absolute_path = this->file_system_.absolute_channel_path(transfer->server, transfer->channel_id, transfer->target_file_path); + absolute_path = this->file_system_->absolute_channel_path(transfer->server, transfer->channel_id, transfer->target_file_path); break; case Transfer::TARGET_TYPE_UNKNOWN: default: @@ -199,7 +200,7 @@ std::shared_ptr>> L } transfer->absolute_file_path = absolute_path; - const auto root_path_length = this->file_system_.root_path().size(); + const auto root_path_length = this->file_system_->root_path().size(); if(root_path_length < absolute_path.size()) transfer->relative_file_path = absolute_path.substr(root_path_length); else diff --git a/file/local_server/LocalFileTransfer.h b/file/local_server/LocalFileTransfer.h new file mode 100644 index 0000000..2188f79 --- /dev/null +++ b/file/local_server/LocalFileTransfer.h @@ -0,0 +1,446 @@ +// +// Created by WolverinDEV on 16/09/2020. +// + +#pragma once + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "./NetTools.h" +#include "LocalFileSystem.h" + +#define TRANSFER_MAX_CACHED_BYTES (1024 * 1024 * 1) // Buffer up to 1mb + +namespace ts::server::file::transfer { + class LocalFileTransfer; + + struct Buffer { + Buffer* next{nullptr}; + + size_t capacity{0}; + size_t length{0}; + size_t offset{0}; + + char data[1]{}; + }; + [[nodiscard]] extern Buffer* allocate_buffer(size_t); + extern void free_buffer(Buffer*); + + /* all variables are locked via the state_mutex */ + struct FileClient : std::enable_shared_from_this { + LocalFileTransfer* handle; + std::shared_ptr transfer{nullptr}; + + std::shared_mutex state_mutex{}; + enum { + STATE_AWAITING_KEY, /* includes SSL/HTTP init */ + STATE_TRANSFERRING, + STATE_FLUSHING, + STATE_DISCONNECTED + } state{STATE_AWAITING_KEY}; + + bool finished_signal_send{false}; + + enum NetworkingProtocol { + PROTOCOL_UNKNOWN, + PROTOCOL_HTTPS, + PROTOCOL_TS_V1 + }; + + enum HTTPUploadState { + HTTP_AWAITING_HEADER, + HTTP_STATE_AWAITING_BOUNDARY, + HTTP_STATE_AWAITING_BOUNDARY_END, + HTTP_STATE_UPLOADING, + HTTP_STATE_DOWNLOADING + }; + + struct { + bool file_locked{false}; + int file_descriptor{0}; + + bool currently_processing{false}; + FileClient* next_client{nullptr}; + + bool query_media_bytes{false}; + uint8_t media_bytes[TRANSFER_MEDIA_BYTES_LENGTH]{0}; + uint8_t media_bytes_length{0}; + } file{}; + + struct { + size_t provided_bytes{0}; + char key[TRANSFER_KEY_LENGTH]{0}; + } transfer_key{}; + + struct { + std::mutex mutex{}; + size_t bytes{0}; + + bool buffering_stopped{false}; + bool write_disconnected{false}; + + Buffer* buffer_head{nullptr}; + Buffer** buffer_tail{&buffer_head}; + } network_buffer{}; + + struct { + std::mutex mutex{}; + size_t bytes{0}; + + bool buffering_stopped{false}; + bool write_disconnected{false}; + + Buffer* buffer_head{nullptr}; + Buffer** buffer_tail{&buffer_head}; + } disk_buffer{}; + + struct { + sockaddr_storage address{}; + int file_descriptor{0}; + + NetworkingProtocol protocol{PROTOCOL_UNKNOWN}; + + struct event* event_read{nullptr}; + struct event* event_write{nullptr}; + struct event* event_throttle{nullptr}; + + bool add_event_write{false}, add_event_read{false}; + + std::chrono::system_clock::time_point disconnect_timeout{}; + + networking::NetworkThrottle client_throttle{}; + /* the right side is the server throttle */ + networking::DualNetworkThrottle throttle{&client_throttle, &networking::NetworkThrottle::kNoThrottle}; + + pipes::SSL pipe_ssl{}; + bool pipe_ssl_init{false}; + std::unique_ptr http_header_buffer{nullptr, free_buffer}; + HTTPUploadState http_state{HTTPUploadState::HTTP_AWAITING_HEADER}; + + std::string http_boundary{}; + + /* Only read the transfer key length at the beginning. We than have the actual limit which will be set via throttle */ + size_t max_read_buffer_size{TRANSFER_KEY_LENGTH}; + } networking{}; + + struct { + networking::TransferStatistics network_send{}; + networking::TransferStatistics network_received{}; + + networking::TransferStatistics file_transferred{}; + + networking::TransferStatistics disk_bytes_read{}; + networking::TransferStatistics disk_bytes_write{}; + } statistics{}; + + struct { + std::chrono::system_clock::time_point last_write{}; + std::chrono::system_clock::time_point last_read{}; + + std::chrono::system_clock::time_point connected{}; + std::chrono::system_clock::time_point key_received{}; + std::chrono::system_clock::time_point disconnecting{}; + } timings; + + explicit FileClient(LocalFileTransfer* handle) : handle{handle} { memtrack::allocated(this); } + ~FileClient(); + + void add_network_write_event(bool /* ignore bandwidth limits */); + void add_network_write_event_nolock(bool /* ignore bandwidth limits */); + + /* will check if we've enough space in out read buffer again */ + void add_network_read_event(bool /* ignore bandwidth limits */); + + bool send_file_bytes(const void* /* buffer */, size_t /* length */); + bool enqueue_network_buffer_bytes(const void* /* buffer */, size_t /* length */); + bool enqueue_disk_buffer_bytes(const void* /* buffer */, size_t /* length */); + + /* these function clear the buffers and set the write disconnected flags to true so no new buffers will be enqueued */ + size_t flush_network_buffer(); + void flush_disk_buffer(); + + [[nodiscard]] bool buffers_flushed(); + [[nodiscard]] inline std::string log_prefix() const { return "[" + net::to_string(this->networking.address) + "]"; } + }; + + enum struct DiskIOStartResult { + SUCCESS, + OUT_OF_MEMORY + }; + + enum struct NetworkingStartResult { + SUCCESS, + OUT_OF_MEMORY + }; + + enum struct NetworkingBindResult { + SUCCESS, + + BINDING_ALREADY_EXISTS, + NETWORKING_NOT_INITIALIZED, + FAILED_TO_ALLOCATE_SOCKET, /* errno is set */ + FAILED_TO_BIND, + FAILED_TO_LISTEN, + + OUT_OF_MEMORY, + }; + + enum struct NetworkingUnbindResult { + SUCCESS, + UNKNOWN_BINDING + }; + + enum struct ClientWorkerStartResult { + SUCCESS + }; + + enum struct NetworkInitializeResult { + SUCCESS, + OUT_OF_MEMORY + }; + + enum struct FileInitializeResult { + SUCCESS, + + INVALID_TRANSFER_DIRECTION, + OUT_OF_MEMORY, + + PROCESS_FILE_LIMIT_REACHED, + SYSTEM_FILE_LIMIT_REACHED, + + FILE_IS_BUSY, + FILE_DOES_NOT_EXISTS, + FILE_SYSTEM_ERROR, + FILE_IS_A_DIRECTORY, + + FILE_TOO_LARGE, + DISK_IS_READ_ONLY, + + FILE_SEEK_FAILED, + FILE_SIZE_MISMATCH, + + FILE_IS_NOT_ACCESSIBLE, + + FAILED_TO_READ_MEDIA_BYTES, + COUNT_NOT_CREATE_DIRECTORIES, + + MAX + }; + + constexpr static std::array kFileInitializeResultMessages{ + /* SUCCESS */ "success", + + /* INVALID_TRANSFER_DIRECTION */ "invalid file transfer direction", + /* OUT_OF_MEMORY */ "out of memory", + + /* PROCESS_FILE_LIMIT_REACHED */ "process file limit reached", + /* SYSTEM_FILE_LIMIT_REACHED */ "system file limit reached", + + /* FILE_IS_BUSY */ "target file is busy", + /* FILE_DOES_NOT_EXISTS */ "target file does not exists", + /* FILE_SYSTEM_ERROR */ "internal file system error", + /* FILE_IS_A_DIRECTORY */ "target file is a directory", + + /* FILE_TOO_LARGE */ "file is too large", + /* DISK_IS_READ_ONLY */ "disk is in read only mode", + + /* FILE_SEEK_FAILED */ "failed to seek to target file offset", + /* FILE_SIZE_MISMATCH */ "file size miss match", + /* FILE_IS_NOT_ACCESSIBLE */ "file is not accessible", + /* FAILED_TO_READ_MEDIA_BYTES */ "failed to read file media bytes", + /* COUNT_NOT_CREATE_DIRECTORIES */ "could not create required directories" + }; + + enum struct TransferKeyApplyResult { + SUCCESS, + FILE_ERROR, + UNKNOWN_KEY, + + INTERNAL_ERROR + }; + + enum struct TransferUploadRawResult { + MORE_DATA_TO_RECEIVE, + FINISH, + FINISH_OVERFLOW, + + /* UNKNOWN ERROR */ + }; + + enum struct TransferUploadHTTPResult { + MORE_DATA_TO_RECEIVE, + FINISH, + + BOUNDARY_MISSING, + BOUNDARY_TOKEN_INVALID, + BOUNDARY_INVALID, + + MISSING_CONTENT_TYPE, + INVALID_CONTENT_TYPE + /* UNKNOWN ERROR */ + }; + + struct NetworkBinding { + std::string hostname{}; + sockaddr_storage address{}; + }; + + struct ActiveNetworkBinding : std::enable_shared_from_this { + std::string hostname{}; + sockaddr_storage address{}; + + int file_descriptor{-1}; + struct event* accept_event{nullptr}; + + LocalFileTransfer* handle{nullptr}; + }; + + class LocalFileTransfer : public AbstractProvider { + public: + explicit LocalFileTransfer(filesystem::LocalFileSystem*); + ~LocalFileTransfer(); + + [[nodiscard]] bool start(); + void stop(); + + [[nodiscard]] NetworkingBindResult add_network_binding(const NetworkBinding& /* binding */); + [[nodiscard]] std::vector active_network_bindings(); + [[nodiscard]] NetworkingUnbindResult remove_network_binding(const NetworkBinding& /* binding */); + + std::shared_ptr>> + initialize_channel_transfer(Transfer::Direction direction, const std::shared_ptr& server, ChannelId channelId, + const TransferInfo &info) override; + + std::shared_ptr>> + initialize_icon_transfer(Transfer::Direction direction, const std::shared_ptr& server, const TransferInfo &info) override; + + std::shared_ptr>> + initialize_avatar_transfer(Transfer::Direction direction, const std::shared_ptr& server, const TransferInfo &info) override; + + std::shared_ptr>> list_transfer() override; + + std::shared_ptr> stop_transfer(const std::shared_ptr& /* server */, transfer_id id, bool) override; + private: + enum struct DiskIOLoopState { + STOPPED, + RUNNING, + + STOPPING, + FORCE_STOPPING + }; + filesystem::LocalFileSystem* file_system_; + + size_t max_concurrent_transfers{1024}; + std::mt19937 transfer_random_token_generator{std::random_device{}()}; + + std::mutex result_notify_mutex{}; + std::condition_variable result_notify_cv{}; + + std::mutex transfers_mutex{}; + std::mutex transfer_create_mutex{}; + std::deque> transfers_{}; + std::deque> pending_transfers{}; + + enum ServerState { + STOPPED, + RUNNING + } state{ServerState::STOPPED}; + + struct { + bool active{false}; + + std::thread dispatch_thread{}; + std::mutex mutex{}; + std::condition_variable notify_cv{}; + } disconnect; + + struct { + std::mutex mutex; + + bool active{false}; + std::thread dispatch_thread{}; + struct event_base* event_base{nullptr}; + + std::deque> bindings{}; + } network{}; + + struct { + DiskIOLoopState state{DiskIOLoopState::STOPPED}; + std::thread dispatch_thread{}; + std::mutex queue_lock{}; + std::condition_variable notify_work_awaiting{}; + std::condition_variable notify_client_processed{}; + + FileClient* queue_head{nullptr}; + FileClient** queue_tail{&queue_head}; + } disk_io{}; + + template + std::shared_ptr> create_execute_response() { + return std::make_shared>(this->result_notify_mutex, this->result_notify_cv); + } + + std::shared_ptr>> + initialize_transfer(Transfer::Direction, const std::shared_ptr &, ChannelId, Transfer::TargetType, const TransferInfo &info); + + [[nodiscard]] NetworkingStartResult start_networking(); + [[nodiscard]] DiskIOStartResult start_disk_io(); + [[nodiscard]] ClientWorkerStartResult start_client_worker(); + + void shutdown_networking(); + void shutdown_disk_io(); + void shutdown_client_worker(); + + void disconnect_client(const std::shared_ptr& /* client */, std::unique_lock& /* state lock */, bool /* flush network */); + + [[nodiscard]] NetworkInitializeResult initialize_networking(const std::shared_ptr& /* client */, int /* file descriptor */); + /* might block 'till all IO operations have been succeeded */ + void finalize_networking(const std::shared_ptr& /* client */, std::unique_lock& /* state lock */); + + [[nodiscard]] FileInitializeResult initialize_file_io(const std::shared_ptr& /* client */); + void finalize_file_io(const std::shared_ptr& /* client */, std::unique_lock& /* state lock */); + + [[nodiscard]] bool initialize_client_ssl(const std::shared_ptr& /* client */); + void finalize_client_ssl(const std::shared_ptr& /* client */); + + void enqueue_disk_io(const std::shared_ptr& /* client */); + void execute_disk_io(const std::shared_ptr& /* client */); + + void test_disconnecting_state(const std::shared_ptr& /* client */); + + [[nodiscard]] TransferUploadRawResult handle_transfer_upload_raw(const std::shared_ptr& /* client */, const char * /* buffer */, size_t /* length */, size_t* /* bytes written */); + [[nodiscard]] TransferUploadHTTPResult handle_transfer_upload_http(const std::shared_ptr& /* client */, const char * /* buffer */, size_t /* length */); + + void send_http_response(const std::shared_ptr& /* client */, http::HttpResponse& /* response */); + + static void callback_transfer_network_write(int, short, void*); + static void callback_transfer_network_read(int, short, void*); + static void callback_transfer_network_throttle(int, short, void*); + static void callback_transfer_network_accept(int, short, void*); + + static void dispatch_loop_client_worker(void*); + static void dispatch_loop_network(void*); + static void dispatch_loop_disk_io(void*); + + void report_transfer_statistics(const std::shared_ptr& /* client */); + [[nodiscard]] TransferStatistics generate_transfer_statistics_report(const std::shared_ptr& /* client */); + void invoke_aborted_callback(const std::shared_ptr& /* client */, const TransferError& /* error */); + void invoke_aborted_callback(const std::shared_ptr& /* pending transfer */, const TransferError& /* error */); + + size_t handle_transfer_read(const std::shared_ptr& /* client */, const char* /* buffer */, size_t /* bytes */); + size_t handle_transfer_read_raw(const std::shared_ptr& /* client */, const char* /* buffer */, size_t /* bytes */); + [[nodiscard]] TransferKeyApplyResult handle_transfer_key_provided(const std::shared_ptr& /* client */, std::string& /* error */); + }; +} \ No newline at end of file diff --git a/file/local_server/LocalFileTransferClientWorker.cpp b/file/local_server/LocalFileTransferClientWorker.cpp index ea89ebb..0dbf1d2 100644 --- a/file/local_server/LocalFileTransferClientWorker.cpp +++ b/file/local_server/LocalFileTransferClientWorker.cpp @@ -6,6 +6,7 @@ #include #include #include "./LocalFileProvider.h" +#include "./LocalFileTransfer.h" using namespace ts::server::file; using namespace ts::server::file::transfer; @@ -113,7 +114,7 @@ void LocalFileTransfer::dispatch_loop_client_worker(void *ptr_transfer) { continue; } - provider->report_transfer_statistics(transfer->shared_from_this()); + provider->report_transfer_statistics(transfer); } } @@ -193,9 +194,14 @@ void LocalFileTransfer::dispatch_loop_client_worker(void *ptr_transfer) { { std::unique_lock slock{client->state_mutex}; client->state = FileClient::STATE_DISCONNECTED; - provider->finalize_file_io(client, slock); - provider->finalize_client_ssl(client); + /* + * First of all disconnect the client from the network so no actions could be triggered by that way. + * Secondly finalize all network components, so no data is pending anywhere + * Thirdly drop the client's disk worker (if it's an upload the data should be written already, else we don't care anyways) + */ provider->finalize_networking(client, slock); + provider->finalize_client_ssl(client); + provider->finalize_file_io(client, slock); } debugMessage(LOG_FT, "{} Destroying transfer.", client->log_prefix()); diff --git a/file/local_server/LocalFileTransferDisk.cpp b/file/local_server/LocalFileTransferDisk.cpp index 6e81652..f08a894 100644 --- a/file/local_server/LocalFileTransferDisk.cpp +++ b/file/local_server/LocalFileTransferDisk.cpp @@ -7,7 +7,7 @@ #include #include #include "./LocalFileProvider.h" -#include "LocalFileProvider.h" +#include "./LocalFileTransfer.h" #include "duration_utils.h" using namespace ts::server::file; @@ -62,7 +62,13 @@ void LocalFileTransfer::dispatch_loop_disk_io(void *provider_ptr) { provider->disk_io.notify_work_awaiting.wait(qlock, [&]{ return provider->disk_io.state != DiskIOLoopState::RUNNING || provider->disk_io.queue_head != nullptr; }); if(provider->disk_io.queue_head) { - client = provider->disk_io.queue_head->shared_from_this(); + try { + client = provider->disk_io.queue_head->shared_from_this(); + } catch (std::bad_weak_ptr& ex) { + logCritical(LOG_FT, "Disk worker encountered a bad weak ptr. This indicated something went horribly wrong! Please submit this on https://forum.teaspeak.de !!!"); + client.reset(); + continue; + } provider->disk_io.queue_head = provider->disk_io.queue_head->file.next_client; if(!provider->disk_io.queue_head) @@ -236,7 +242,7 @@ FileInitializeResult LocalFileTransfer::initialize_file_io(const std::shared_ptr } } - this->file_system_.lock_file(absolute_path); + this->file_system_->lock_file(absolute_path); transfer->file.file_locked = true; if(transfer->transfer->direction == Transfer::DIRECTION_UPLOAD) { @@ -307,7 +313,7 @@ FileInitializeResult LocalFileTransfer::initialize_file_io(const std::shared_ptr return FileInitializeResult::SUCCESS; error_exit: if(std::exchange(transfer->file.file_locked, false)) - this->file_system_.unlock_file(absolute_path); + this->file_system_->unlock_file(absolute_path); if(file_data.file_descriptor > 0) ::close(file_data.file_descriptor); @@ -319,8 +325,9 @@ FileInitializeResult LocalFileTransfer::initialize_file_io(const std::shared_ptr void LocalFileTransfer::finalize_file_io(const std::shared_ptr &transfer, std::unique_lock &state_lock) { assert(state_lock.owns_lock()); - if(!transfer->transfer) + if(!transfer->transfer) { return; + } auto absolute_path = transfer->transfer->absolute_file_path; @@ -337,8 +344,9 @@ void LocalFileTransfer::finalize_file_io(const std::shared_ptr &tran if(this->disk_io.queue_head == &*transfer) { this->disk_io.queue_head = file_data.next_client; - if (!this->disk_io.queue_head) + if (!this->disk_io.queue_head) { this->disk_io.queue_tail = &this->disk_io.queue_head; + } } else if(file_data.next_client || this->disk_io.queue_tail == &file_data.next_client) { FileClient* head{this->disk_io.queue_head}; while(head->file.next_client != &*transfer) { @@ -356,8 +364,9 @@ void LocalFileTransfer::finalize_file_io(const std::shared_ptr &tran } state_lock.lock(); - if(std::exchange(file_data.file_locked, false)) - this->file_system_.unlock_file(absolute_path); + if(std::exchange(file_data.file_locked, false)) { + this->file_system_->unlock_file(absolute_path); + } if(file_data.file_descriptor > 0) ::close(file_data.file_descriptor); @@ -365,18 +374,22 @@ void LocalFileTransfer::finalize_file_io(const std::shared_ptr &tran } void LocalFileTransfer::enqueue_disk_io(const std::shared_ptr &client) { - if(!client->file.file_descriptor) + if(!client->file.file_descriptor) { return; + } - if(!client->transfer) + if(!client->transfer) { return; + } if(client->transfer->direction == Transfer::DIRECTION_DOWNLOAD) { - if(client->state != FileClient::STATE_TRANSFERRING) + if(client->state != FileClient::STATE_TRANSFERRING) { return; + } - if(client->disk_buffer.bytes > TRANSFER_MAX_CACHED_BYTES) + if(client->disk_buffer.bytes > TRANSFER_MAX_CACHED_BYTES) { return; + } } else if(client->transfer->direction == Transfer::DIRECTION_UPLOAD) { /* we don't do this check because this might be a flush instruction, where the buffer is actually zero bytes filled */ /* @@ -386,8 +399,9 @@ void LocalFileTransfer::enqueue_disk_io(const std::shared_ptr &clien } std::lock_guard dlock{this->disk_io.queue_lock}; - if(client->file.next_client || this->disk_io.queue_tail == &client->file.next_client) + if(client->file.next_client || this->disk_io.queue_tail == &client->file.next_client) { return; + } *this->disk_io.queue_tail = &*client; this->disk_io.queue_tail = &client->file.next_client; @@ -399,8 +413,8 @@ void LocalFileTransfer::execute_disk_io(const std::shared_ptr &clien if(!client->transfer) return; if(client->transfer->direction == Transfer::DIRECTION_UPLOAD) { - Buffer* buffer{nullptr}; - size_t buffer_left_size{0}; + Buffer* buffer; + size_t buffer_left_size; while(true) { { diff --git a/file/local_server/LocalFileTransferNetwork.cpp b/file/local_server/LocalFileTransferNetwork.cpp index 11a7a6f..50c6d99 100644 --- a/file/local_server/LocalFileTransferNetwork.cpp +++ b/file/local_server/LocalFileTransferNetwork.cpp @@ -11,8 +11,8 @@ #include #include "./LocalFileProvider.h" #include "./duration_utils.h" -#include "HTTPUtils.h" -#include "LocalFileProvider.h" +#include "./HTTPUtils.h" +#include "./LocalFileTransfer.h" #if defined(TCP_CORK) && !defined(TCP_NOPUSH) #define TCP_NOPUSH TCP_CORK @@ -549,6 +549,13 @@ void LocalFileTransfer::callback_transfer_network_throttle(int, short, void *ptr void LocalFileTransfer::callback_transfer_network_read(int fd, short events, void *ptr_transfer) { auto transfer = reinterpret_cast(ptr_transfer); + std::shared_ptr client{}; + try { + client = transfer->shared_from_this(); + } catch (std::bad_weak_ptr& ex) { + logCritical(LOG_FT, "Network read worker encountered a bad weak ptr to a client. This indicated something went horribly wrong! Please submit this on https://forum.teaspeak.de !!!"); + return; + } if((unsigned) events & (unsigned) EV_TIMEOUT) { /* should never happen, receive timeouts are done via the client tick */ @@ -569,7 +576,7 @@ void LocalFileTransfer::callback_transfer_network_read(int fd, short events, voi if(read == 0) { std::unique_lock slock{transfer->state_mutex}; auto original_state = transfer->state; - transfer->handle->disconnect_client(transfer->shared_from_this(), slock, true); + transfer->handle->disconnect_client(client, slock, true); slock.unlock(); switch(original_state) { @@ -588,7 +595,7 @@ void LocalFileTransfer::callback_transfer_network_read(int fd, short events, voi transfer->log_prefix(), transfer->statistics.file_transferred.total_bytes, transfer->transfer->expected_file_size - transfer->transfer->file_offset); } - transfer->handle->invoke_aborted_callback(transfer->shared_from_this(), { TransferError::UNEXPECTED_CLIENT_DISCONNECT, "" }); + transfer->handle->invoke_aborted_callback(client, { TransferError::UNEXPECTED_CLIENT_DISCONNECT, "" }); break; } case FileClient::STATE_FLUSHING: @@ -607,7 +614,7 @@ void LocalFileTransfer::callback_transfer_network_read(int fd, short events, voi std::unique_lock slock{transfer->state_mutex}; auto original_state = transfer->state; - transfer->handle->disconnect_client(transfer->shared_from_this(), slock, true); + transfer->handle->disconnect_client(client, slock, true); slock.unlock(); switch(original_state) { @@ -627,7 +634,7 @@ void LocalFileTransfer::callback_transfer_network_read(int fd, short events, voi } - transfer->handle->invoke_aborted_callback(transfer->shared_from_this(), { TransferError::NETWORK_IO_ERROR, strerror(errno) }); + transfer->handle->invoke_aborted_callback(client, { TransferError::NETWORK_IO_ERROR, strerror(errno) }); break; case FileClient::STATE_FLUSHING: case FileClient::STATE_DISCONNECTED: @@ -649,10 +656,10 @@ void LocalFileTransfer::callback_transfer_network_read(int fd, short events, voi size_t bytes_buffered{0}; if(transfer->state == FileClient::STATE_AWAITING_KEY) { - bytes_buffered = transfer->handle->handle_transfer_read_raw(transfer->shared_from_this(), buffer, read); + bytes_buffered = transfer->handle->handle_transfer_read_raw(client, buffer, read); } else if(transfer->state == FileClient::STATE_TRANSFERRING) { if(transfer->transfer->direction == Transfer::DIRECTION_UPLOAD) { - bytes_buffered = transfer->handle->handle_transfer_read_raw(transfer->shared_from_this(), buffer, read); + bytes_buffered = transfer->handle->handle_transfer_read_raw(client, buffer, read); } else { debugMessage(LOG_FT, "{} Received {} bytes without any need. Dropping them.", transfer->log_prefix(), read); } @@ -678,6 +685,13 @@ void LocalFileTransfer::callback_transfer_network_read(int fd, short events, voi void LocalFileTransfer::callback_transfer_network_write(int fd, short events, void *ptr_transfer) { auto transfer = reinterpret_cast(ptr_transfer); + std::shared_ptr client{}; + try { + client = transfer->shared_from_this(); + } catch (std::bad_weak_ptr& ex) { + logCritical(LOG_FT, "Network write worker encountered a bad weak ptr to a client. This indicated something went horribly wrong! Please submit this on https://forum.teaspeak.de !!!"); + return; + } if((unsigned) events & (unsigned) EV_TIMEOUT) { if(transfer->state == FileClient::STATE_FLUSHING) { @@ -693,11 +707,11 @@ void LocalFileTransfer::callback_transfer_network_write(int fd, short events, vo if(!std::exchange(transfer->finished_signal_send, true)) { if(transfer->transfer) { - transfer->handle->invoke_aborted_callback(transfer->shared_from_this(), { TransferError::NETWORK_IO_ERROR, "failed to flush outgoing buffer" }); + transfer->handle->invoke_aborted_callback(client, { TransferError::NETWORK_IO_ERROR, "failed to flush outgoing buffer" }); } } - transfer->handle->test_disconnecting_state(transfer->shared_from_this()); + transfer->handle->test_disconnecting_state(client); return; } } @@ -736,12 +750,12 @@ void LocalFileTransfer::callback_transfer_network_write(int fd, short events, vo logError(LOG_FT, "{} Client disconnected unexpectedly on write. Send {} bytes out of {}.", transfer->log_prefix(), transfer->statistics.file_transferred.total_bytes, transfer->transfer ? transfer->transfer->expected_file_size - transfer->transfer->file_offset : -1); - transfer->handle->invoke_aborted_callback(transfer->shared_from_this(), { TransferError::UNEXPECTED_CLIENT_DISCONNECT, "" }); + transfer->handle->invoke_aborted_callback(client, { TransferError::UNEXPECTED_CLIENT_DISCONNECT, "" }); } else { logError(LOG_FT, "{} Received network write error. Send {} bytes out of {}. Closing transfer.", transfer->log_prefix(), transfer->statistics.file_transferred.total_bytes, transfer->transfer ? transfer->transfer->expected_file_size - transfer->transfer->file_offset : -1); - transfer->handle->invoke_aborted_callback(transfer->shared_from_this(), { TransferError::NETWORK_IO_ERROR, strerror(errno) }); + transfer->handle->invoke_aborted_callback(client, { TransferError::NETWORK_IO_ERROR, strerror(errno) }); } } else if(transfer->state == FileClient::STATE_FLUSHING && transfer->transfer) { { @@ -753,12 +767,12 @@ void LocalFileTransfer::callback_transfer_network_write(int fd, short events, vo transfer->flush_network_buffer(); if(written == 0) { logError(LOG_FT, "{} Received unexpected client disconnect while flushing the network buffer. Transfer failed.", transfer->log_prefix()); - transfer->handle->invoke_aborted_callback(transfer->shared_from_this(), { TransferError::UNEXPECTED_CLIENT_DISCONNECT, "" }); + transfer->handle->invoke_aborted_callback(client, { TransferError::UNEXPECTED_CLIENT_DISCONNECT, "" }); } else { logError(LOG_FT, "{} Received network write error while flushing the network buffer. Closing transfer.", transfer->log_prefix()); - transfer->handle->invoke_aborted_callback(transfer->shared_from_this(), { TransferError::NETWORK_IO_ERROR, strerror(errno) }); + transfer->handle->invoke_aborted_callback(client, { TransferError::NETWORK_IO_ERROR, strerror(errno) }); } } @@ -773,7 +787,7 @@ void LocalFileTransfer::callback_transfer_network_write(int fd, short events, vo std::unique_lock slock{transfer->state_mutex}; /* no need to flush anything here, write will only be invoked on a client download */ - transfer->handle->disconnect_client(transfer->shared_from_this(), slock, false); + transfer->handle->disconnect_client(client, slock, false); return; } else { buffer->offset += written; @@ -808,19 +822,19 @@ void LocalFileTransfer::callback_transfer_network_write(int fd, short events, vo if(buffer_left_size > 0) { transfer->add_network_write_event(false); } else if(transfer->state == FileClient::STATE_FLUSHING) { - transfer->handle->test_disconnecting_state(transfer->shared_from_this()); + transfer->handle->test_disconnecting_state(client); if(!std::exchange(transfer->finished_signal_send, true)) { if(transfer->transfer && transfer->statistics.file_transferred.total_bytes + transfer->transfer->file_offset == transfer->transfer->expected_file_size) { logMessage(LOG_FT, "{} Finished file transfer within {}. Closing connection.", transfer->log_prefix(), duration_to_string(std::chrono::system_clock::now() - transfer->timings.key_received)); - transfer->handle->report_transfer_statistics(transfer->shared_from_this()); + transfer->handle->report_transfer_statistics(client); if(auto callback{transfer->handle->callback_transfer_finished}; callback) callback(transfer->transfer); } } return; } - transfer->handle->enqueue_disk_io(transfer->shared_from_this()); + transfer->handle->enqueue_disk_io(client); } } @@ -845,7 +859,7 @@ size_t LocalFileTransfer::handle_transfer_read_raw(const std::shared_ptrlog_prefix()); std::unique_lock slock{client->state_mutex}; - client->handle->disconnect_client(client->shared_from_this(), slock, true); + client->handle->disconnect_client(client, slock, true); return (size_t) -1; } @@ -853,7 +867,7 @@ size_t LocalFileTransfer::handle_transfer_read_raw(const std::shared_ptrlog_prefix()); std::unique_lock slock{client->state_mutex}; - client->handle->disconnect_client(client->shared_from_this(), slock, true); + client->handle->disconnect_client(client, slock, true); return (size_t) -1; } @@ -923,7 +937,7 @@ size_t LocalFileTransfer::handle_transfer_read_raw(const std::shared_ptrstate_mutex}; - client->handle->disconnect_client(client->shared_from_this(), slock, true); + client->handle->disconnect_client(client, slock, true); return (size_t) -1; } @@ -1066,7 +1080,7 @@ size_t LocalFileTransfer::handle_transfer_read(const std::shared_ptr this->send_http_response(client, response); if(response.code->code != 200 || !client->transfer) { std::unique_lock slock{client->state_mutex}; - client->handle->disconnect_client(client->shared_from_this(), slock, true); + client->handle->disconnect_client(client, slock, true); return (size_t) -1; } @@ -1080,7 +1094,7 @@ size_t LocalFileTransfer::handle_transfer_read(const std::shared_ptr logError(LOG_FT, "{} Protocol variable contains invalid protocol for awaiting key state. Disconnecting client.", client->log_prefix()); std::unique_lock slock{client->state_mutex}; - client->handle->disconnect_client(client->shared_from_this(), slock, true); + client->handle->disconnect_client(client, slock, true); return (size_t) -1; } } else if(client->state == FileClient::STATE_TRANSFERRING) { @@ -1104,7 +1118,7 @@ size_t LocalFileTransfer::handle_transfer_read(const std::shared_ptr callback(client->transfer); std::unique_lock slock{client->state_mutex}; - client->handle->disconnect_client(client->shared_from_this(), slock, true); + client->handle->disconnect_client(client, slock, true); return client->network_buffer.bytes; /* a bit unexact but the best we could get away with it */ } @@ -1144,7 +1158,7 @@ size_t LocalFileTransfer::handle_transfer_read(const std::shared_ptr client->handle->send_http_response(client, response); std::unique_lock slock{client->state_mutex}; - client->handle->disconnect_client(client->shared_from_this(), slock, true); + client->handle->disconnect_client(client, slock, true); return (size_t) -1; } else if(client->networking.protocol == FileClient::PROTOCOL_TS_V1) { @@ -1165,7 +1179,7 @@ size_t LocalFileTransfer::handle_transfer_read(const std::shared_ptr callback(client->transfer); std::unique_lock slock{client->state_mutex}; - client->handle->disconnect_client(client->shared_from_this(), slock, true); + client->handle->disconnect_client(client, slock, true); return (size_t) -1; } diff --git a/git-teaspeak b/git-teaspeak index 61e4c6b..b36b1e1 160000 --- a/git-teaspeak +++ b/git-teaspeak @@ -1 +1 @@ -Subproject commit 61e4c6bc67e72843bfeef09108db58fd79babf1e +Subproject commit b36b1e19aeb8bd5026437d2bb0b4d91c480ed65b diff --git a/server/src/client/command_handler/server.cpp b/server/src/client/command_handler/server.cpp index b166ba9..e53da16 100644 --- a/server/src/client/command_handler/server.cpp +++ b/server/src/client/command_handler/server.cpp @@ -314,8 +314,9 @@ command_result ConnectedClient::handleCommandServerGroupAdd(Command &cmd) { ACTION_REQUIRES_GLOBAL_PERMISSION(permission::b_serverinstance_modify_templates, 1); log_group_type = log::GroupType::TEMPLATE; } else { - if(!this->server) + if(!this->server) { return command_result{error::parameter_invalid, "you cant create normal groups on the template server!"}; + } log_group_type = log::GroupType::NORMAL; }