#pragma once #include #include #include #include #include #include #include #include #include #include #include #include #include "./NetTools.h" #define TRANSFER_MAX_CACHED_BYTES (1024 * 1024 * 1) // Buffer up to 1mb namespace ts::server::file { namespace filesystem { #ifdef FS_INCLUDED namespace fs = std::experimental::filesystem; #endif class LocalFileSystem : public filesystem::AbstractProvider { using FileModifyError = filesystem::FileModifyError; using DirectoryModifyError = filesystem::DirectoryModifyError; public: enum struct FileCategory { ICON, AVATAR, CHANNEL }; virtual ~LocalFileSystem(); bool initialize(std::string & /* error */, const std::string & /* root path */); void finalize(); void lock_file(const std::string& /* absolute path */); void unlock_file(const std::string& /* absolute path */); [[nodiscard]] inline const auto &root_path() const { return this->root_path_; } [[nodiscard]] std::string absolute_avatar_path(const std::shared_ptr &, const std::string&); [[nodiscard]] std::string absolute_icon_path(const std::shared_ptr &, const std::string&); [[nodiscard]] std::string absolute_channel_path(const std::shared_ptr &, ChannelId, const std::string&); std::shared_ptr> initialize_server(const std::shared_ptr & /* server */) override; std::shared_ptr> delete_server(const std::shared_ptr & /* server */) override; std::shared_ptr> query_channel_info(const std::shared_ptr & /* server */, const std::vector>& /* names */) override; std::shared_ptr query_channel_directory(const std::shared_ptr & id, ChannelId channelId, const std::string &string) override; std::shared_ptr> create_channel_directory(const std::shared_ptr & id, ChannelId channelId, const std::string &string) override; std::shared_ptr> delete_channel_files(const std::shared_ptr & id, ChannelId channelId, const std::vector &string) override; std::shared_ptr> rename_channel_file(const std::shared_ptr & id, ChannelId channelId, const std::string &, ChannelId, const std::string &) override; std::shared_ptr> query_icon_info(const std::shared_ptr & /* server */, const std::vector& /* names */) override; std::shared_ptr query_icon_directory(const std::shared_ptr & id) override; std::shared_ptr> delete_icons(const std::shared_ptr & id, const std::vector &string) override; std::shared_ptr> query_avatar_info(const std::shared_ptr & /* server */, const std::vector& /* names */) override; std::shared_ptr query_avatar_directory(const std::shared_ptr & id) override; std::shared_ptr> delete_avatars(const std::shared_ptr & id, const std::vector &string) override; private: #ifdef FS_INCLUDED [[nodiscard]] fs::path server_path(const std::shared_ptr &); [[nodiscard]] fs::path server_channel_path(const std::shared_ptr &, ChannelId); [[nodiscard]] static bool exceeds_base_path(const fs::path& /* base */, const fs::path& /* target */); [[nodiscard]] bool is_any_file_locked(const fs::path& /* base */, const std::string& /* path */, std::string& /* file (relative to the base) */); [[nodiscard]] std::shared_ptr> delete_files(const fs::path& /* base */, const std::vector &string); [[nodiscard]] std::shared_ptr query_directory(const fs::path& /* base */, const std::string &string, bool); [[nodiscard]] std::shared_ptr> query_file_info(const std::vector> &string); #endif template std::shared_ptr> create_execute_response() { return std::make_shared>(this->result_notify_mutex, this->result_notify_cv); } std::string target_file_path(FileCategory type, const std::shared_ptr &sid, ts::ChannelId cid, const std::string &path); std::mutex result_notify_mutex{}; std::condition_variable result_notify_cv{}; std::string root_path_{}; std::mutex locked_files_mutex{}; std::deque locked_files_{}; }; } namespace transfer { class LocalFileTransfer; struct Buffer { Buffer* next{nullptr}; size_t capacity{0}; size_t length{0}; size_t offset{0}; char data[1]{}; }; [[nodiscard]] extern Buffer* allocate_buffer(size_t); extern void free_buffer(Buffer*); /* all variables are locked via the state_mutex */ struct FileClient : std::enable_shared_from_this { LocalFileTransfer* handle; std::shared_ptr transfer{nullptr}; std::shared_mutex state_mutex{}; enum { STATE_AWAITING_KEY, /* includes SSL/HTTP init */ STATE_TRANSFERRING, STATE_DISCONNECTING, STATE_DISCONNECTED } state{STATE_AWAITING_KEY}; bool finished_signal_send{false}; enum NetworkingProtocol { PROTOCOL_UNKNOWN, PROTOCOL_HTTPS, PROTOCOL_TS_V1 }; enum HTTPUploadState { HTTP_AWAITING_HEADER, HTTP_STATE_AWAITING_BOUNDARY, HTTP_STATE_AWAITING_BOUNDARY_END, HTTP_STATE_UPLOADING, HTTP_STATE_DOWNLOADING }; struct { bool file_locked{false}; int file_descriptor{0}; bool currently_processing{false}; FileClient* next_client{nullptr}; bool query_media_bytes{false}; uint8_t media_bytes[TRANSFER_MEDIA_BYTES_LENGTH]{0}; uint8_t media_bytes_length{0}; } file{}; struct { size_t provided_bytes{0}; char key[TRANSFER_KEY_LENGTH]{0}; } transfer_key{}; struct { std::mutex mutex{}; size_t bytes{0}; bool buffering_stopped{false}; bool write_disconnected{false}; Buffer* buffer_head{nullptr}; Buffer** buffer_tail{&buffer_head}; } network_buffer{}; struct { std::mutex mutex{}; size_t bytes{0}; bool buffering_stopped{false}; bool write_disconnected{false}; Buffer* buffer_head{nullptr}; Buffer** buffer_tail{&buffer_head}; } disk_buffer{}; struct { sockaddr_storage address{}; int file_descriptor{0}; NetworkingProtocol protocol{PROTOCOL_UNKNOWN}; struct event* event_read{nullptr}; struct event* event_write{nullptr}; struct event* event_throttle{nullptr}; bool add_event_write{false}, add_event_read{false}; std::chrono::system_clock::time_point disconnect_timeout{}; networking::NetworkThrottle client_throttle{}; /* the right side is the server throttle */ networking::DualNetworkThrottle throttle{&client_throttle, &networking::NetworkThrottle::kNoThrottle}; pipes::SSL pipe_ssl{}; bool pipe_ssl_init{false}; std::unique_ptr http_header_buffer{nullptr, free_buffer}; HTTPUploadState http_state{HTTPUploadState::HTTP_AWAITING_HEADER}; std::string http_boundary{}; /* Only read the transfer key length at the beginning. We than have the actual limit which will be set via throttle */ size_t max_read_buffer_size{TRANSFER_KEY_LENGTH}; } networking{}; struct { networking::TransferStatistics network_send{}; networking::TransferStatistics network_received{}; networking::TransferStatistics file_transferred{}; networking::TransferStatistics disk_bytes_read{}; networking::TransferStatistics disk_bytes_write{}; } statistics{}; struct { std::chrono::system_clock::time_point last_write{}; std::chrono::system_clock::time_point last_read{}; std::chrono::system_clock::time_point connected{}; std::chrono::system_clock::time_point key_received{}; std::chrono::system_clock::time_point disconnecting{}; } timings; explicit FileClient(LocalFileTransfer* handle) : handle{handle} { memtrack::allocated(this); } ~FileClient(); void add_network_write_event(bool /* ignore bandwidth limits */); void add_network_write_event_nolock(bool /* ignore bandwidth limits */); /* will check if we've enough space in out read buffer again */ void add_network_read_event(bool /* ignore bandwidth limits */); bool send_file_bytes(const void* /* buffer */, size_t /* length */); bool enqueue_network_buffer_bytes(const void* /* buffer */, size_t /* length */); bool enqueue_disk_buffer_bytes(const void* /* buffer */, size_t /* length */); /* these function clear the buffers and set the write disconnected flags to true so no new buffers will be enqueued */ void flush_network_buffer(); void flush_disk_buffer(); [[nodiscard]] inline std::string log_prefix() const { return "[" + net::to_string(this->networking.address) + "]"; } }; enum struct DiskIOStartResult { SUCCESS, OUT_OF_MEMORY }; enum struct NetworkingStartResult { SUCCESS, OUT_OF_MEMORY }; enum struct NetworkingBindResult { SUCCESS, BINDING_ALREADY_EXISTS, NETWORKING_NOT_INITIALIZED, FAILED_TO_ALLOCATE_SOCKET, /* errno is set */ FAILED_TO_BIND, FAILED_TO_LISTEN, OUT_OF_MEMORY, }; enum struct NetworkingUnbindResult { SUCCESS, UNKNOWN_BINDING }; enum struct ClientWorkerStartResult { SUCCESS }; enum struct NetworkInitializeResult { SUCCESS, OUT_OF_MEMORY }; enum struct FileInitializeResult { SUCCESS, INVALID_TRANSFER_DIRECTION, OUT_OF_MEMORY, PROCESS_FILE_LIMIT_REACHED, SYSTEM_FILE_LIMIT_REACHED, FILE_IS_BUSY, FILE_DOES_NOT_EXISTS, FILE_SYSTEM_ERROR, FILE_IS_A_DIRECTORY, FILE_TOO_LARGE, DISK_IS_READ_ONLY, FILE_SEEK_FAILED, FILE_SIZE_MISMATCH, FILE_IS_NOT_ACCESSIBLE, FAILED_TO_READ_MEDIA_BYTES, COUNT_NOT_CREATE_DIRECTORIES, MAX }; constexpr static std::array kFileInitializeResultMessages{ /* SUCCESS */ "success", /* INVALID_TRANSFER_DIRECTION */ "invalid file transfer direction", /* OUT_OF_MEMORY */ "out of memory", /* PROCESS_FILE_LIMIT_REACHED */ "process file limit reached", /* SYSTEM_FILE_LIMIT_REACHED */ "system file limit reached", /* FILE_IS_BUSY */ "target file is busy", /* FILE_DOES_NOT_EXISTS */ "target file does not exists", /* FILE_SYSTEM_ERROR */ "internal file system error", /* FILE_IS_A_DIRECTORY */ "target file is a directory", /* FILE_TOO_LARGE */ "file is too large", /* DISK_IS_READ_ONLY */ "disk is in read only mode", /* FILE_SEEK_FAILED */ "failed to seek to target file offset", /* FILE_SIZE_MISMATCH */ "file size miss match", /* FILE_IS_NOT_ACCESSIBLE */ "file is not accessible", /* FAILED_TO_READ_MEDIA_BYTES */ "failed to read file media bytes", /* COUNT_NOT_CREATE_DIRECTORIES */ "could not create required directories" }; enum struct TransferKeyApplyResult { SUCCESS, FILE_ERROR, UNKNOWN_KEY, INTERNAL_ERROR }; enum struct TransferUploadRawResult { MORE_DATA_TO_RECEIVE, FINISH, FINISH_OVERFLOW, /* UNKNOWN ERROR */ }; enum struct TransferUploadHTTPResult { MORE_DATA_TO_RECEIVE, FINISH, BOUNDARY_MISSING, BOUNDARY_TOKEN_INVALID, BOUNDARY_INVALID, MISSING_CONTENT_TYPE, INVALID_CONTENT_TYPE /* UNKNOWN ERROR */ }; struct NetworkBinding { std::string hostname{}; sockaddr_storage address{}; }; struct ActiveNetworkBinding : std::enable_shared_from_this { std::string hostname{}; sockaddr_storage address{}; int file_descriptor{-1}; struct event* accept_event{nullptr}; LocalFileTransfer* handle{nullptr}; }; class LocalFileTransfer : public AbstractProvider { public: explicit LocalFileTransfer(filesystem::LocalFileSystem&); ~LocalFileTransfer(); [[nodiscard]] bool start(); void stop(); [[nodiscard]] NetworkingBindResult add_network_binding(const NetworkBinding& /* binding */); [[nodiscard]] std::vector active_network_bindings(); [[nodiscard]] NetworkingUnbindResult remove_network_binding(const NetworkBinding& /* binding */); std::shared_ptr>> initialize_channel_transfer(Transfer::Direction direction, const std::shared_ptr& server, ChannelId channelId, const TransferInfo &info) override; std::shared_ptr>> initialize_icon_transfer(Transfer::Direction direction, const std::shared_ptr& server, const TransferInfo &info) override; std::shared_ptr>> initialize_avatar_transfer(Transfer::Direction direction, const std::shared_ptr& server, const TransferInfo &info) override; std::shared_ptr>> list_transfer() override; std::shared_ptr> stop_transfer(const std::shared_ptr& /* server */, transfer_id id, bool) override; private: enum struct DiskIOLoopState { STOPPED, RUNNING, STOPPING, FORCE_STOPPING }; filesystem::LocalFileSystem& file_system_; size_t max_concurrent_transfers{1024}; std::mt19937 transfer_random_token_generator{std::random_device{}()}; std::mutex result_notify_mutex{}; std::condition_variable result_notify_cv{}; std::mutex transfers_mutex{}; std::mutex transfer_create_mutex{}; std::deque> transfers_{}; std::deque> pending_transfers{}; enum ServerState { STOPPED, RUNNING } state{ServerState::STOPPED}; struct { bool active{false}; std::thread dispatch_thread{}; std::mutex mutex{}; std::condition_variable notify_cv{}; } disconnect; struct { std::mutex mutex; bool active{false}; std::thread dispatch_thread{}; struct event_base* event_base{nullptr}; std::deque> bindings{}; } network{}; struct { DiskIOLoopState state{DiskIOLoopState::STOPPED}; std::thread dispatch_thread{}; std::mutex queue_lock{}; std::condition_variable notify_work_awaiting{}; std::condition_variable notify_client_processed{}; FileClient* queue_head{nullptr}; FileClient** queue_tail{&queue_head}; } disk_io{}; template std::shared_ptr> create_execute_response() { return std::make_shared>(this->result_notify_mutex, this->result_notify_cv); } std::shared_ptr>> initialize_transfer(Transfer::Direction, const std::shared_ptr &, ChannelId, Transfer::TargetType, const TransferInfo &info); [[nodiscard]] NetworkingStartResult start_networking(); [[nodiscard]] DiskIOStartResult start_disk_io(); [[nodiscard]] ClientWorkerStartResult start_client_worker(); void shutdown_networking(); void shutdown_disk_io(); void shutdown_client_worker(); void disconnect_client(const std::shared_ptr& /* client */, std::unique_lock& /* state lock */, bool /* flush network */); [[nodiscard]] NetworkInitializeResult initialize_networking(const std::shared_ptr& /* client */, int /* file descriptor */); /* might block 'till all IO operations have been succeeded */ void finalize_networking(const std::shared_ptr& /* client */, std::unique_lock& /* state lock */); [[nodiscard]] FileInitializeResult initialize_file_io(const std::shared_ptr& /* client */); void finalize_file_io(const std::shared_ptr& /* client */, std::unique_lock& /* state lock */); [[nodiscard]] bool initialize_client_ssl(const std::shared_ptr& /* client */); void finalize_client_ssl(const std::shared_ptr& /* client */); void enqueue_disk_io(const std::shared_ptr& /* client */); void execute_disk_io(const std::shared_ptr& /* client */); void test_disconnecting_state(const std::shared_ptr& /* client */); [[nodiscard]] TransferUploadRawResult handle_transfer_upload_raw(const std::shared_ptr& /* client */, const char * /* buffer */, size_t /* length */, size_t* /* bytes written */); [[nodiscard]] TransferUploadHTTPResult handle_transfer_upload_http(const std::shared_ptr& /* client */, const char * /* buffer */, size_t /* length */); void send_http_response(const std::shared_ptr& /* client */, http::HttpResponse& /* response */); static void callback_transfer_network_write(int, short, void*); static void callback_transfer_network_read(int, short, void*); static void callback_transfer_network_throttle(int, short, void*); static void callback_transfer_network_accept(int, short, void*); static void dispatch_loop_client_worker(void*); static void dispatch_loop_network(void*); static void dispatch_loop_disk_io(void*); void report_transfer_statistics(const std::shared_ptr& /* client */); [[nodiscard]] TransferStatistics generate_transfer_statistics_report(const std::shared_ptr& /* client */); void invoke_aborted_callback(const std::shared_ptr& /* client */, const TransferError& /* error */); void invoke_aborted_callback(const std::shared_ptr& /* pending transfer */, const TransferError& /* error */); size_t handle_transfer_read(const std::shared_ptr& /* client */, const char* /* buffer */, size_t /* bytes */); size_t handle_transfer_read_raw(const std::shared_ptr& /* client */, const char* /* buffer */, size_t /* bytes */); [[nodiscard]] TransferKeyApplyResult handle_transfer_key_provided(const std::shared_ptr& /* client */, std::string& /* error */); }; } class LocalVirtualFileServer : public VirtualFileServer { public: explicit LocalVirtualFileServer(ServerId server_id, std::string unique_id) : VirtualFileServer{server_id, std::move(unique_id)} {} void max_networking_upload_bandwidth(int64_t value) override; void max_networking_download_bandwidth(int64_t value) override; networking::NetworkThrottle upload_throttle{}; networking::NetworkThrottle download_throttle{}; }; class LocalFileProvider : public AbstractFileServer { public: LocalFileProvider(); virtual ~LocalFileProvider(); [[nodiscard]] bool initialize(std::string& /* error */); void finalize(); [[nodiscard]] std::string file_base_path() const override; filesystem::AbstractProvider &file_system() override; transfer::AbstractProvider &file_transfer() override; std::shared_ptr register_server(ServerId /* server id */) override; void unregister_server(ServerId /* server id */) override; private: filesystem::LocalFileSystem file_system_; transfer::LocalFileTransfer file_transfer_; }; }