#pragma once #include #include #include #include #include #include #include #include #include #include #define TRANSFER_MAX_CACHED_BYTES (1024 * 1024 * 1) // Buffer up to 1mb namespace ts::server::file { namespace filesystem { #ifdef FS_INCLUDED namespace fs = std::experimental::filesystem; #endif class LocalFileSystem : public filesystem::AbstractProvider { using FileModifyError = filesystem::FileModifyError; using DirectoryModifyError = filesystem::DirectoryModifyError; public: enum struct FileCategory { ICON, AVATAR, CHANNEL }; virtual ~LocalFileSystem(); bool initialize(std::string & /* error */, const std::string & /* root path */); void finalize(); void lock_file(const std::string& /* absolute path */); void unlock_file(const std::string& /* absolute path */); [[nodiscard]] inline const auto &root_path() const { return this->root_path_; } [[nodiscard]] std::string absolute_avatar_path(ServerId, const std::string&); [[nodiscard]] std::string absolute_icon_path(ServerId, const std::string&); [[nodiscard]] std::string absolute_channel_path(ServerId, ChannelId, const std::string&); std::shared_ptr> initialize_server(ServerId /* server */) override; std::shared_ptr> delete_server(ServerId /* server */) override; std::shared_ptr query_channel_directory(ServerId id, ChannelId channelId, const std::string &string) override; std::shared_ptr> create_channel_directory(ServerId id, ChannelId channelId, const std::string &string) override; std::shared_ptr> delete_channel_file(ServerId id, ChannelId channelId, const std::string &string) override; std::shared_ptr> rename_channel_file(ServerId id, ChannelId channelId, const std::string &, const std::string &) override; std::shared_ptr query_icon_directory(ServerId id) override; std::shared_ptr> delete_icon(ServerId id, const std::string &string) override; std::shared_ptr query_avatar_directory(ServerId id) override; std::shared_ptr> delete_avatar(ServerId id, const std::string &string) override; private: #ifdef FS_INCLUDED [[nodiscard]] fs::path server_path(ServerId); [[nodiscard]] fs::path server_channel_path(ServerId, ChannelId); [[nodiscard]] static bool exceeds_base_path(const fs::path& /* base */, const fs::path& /* target */); [[nodiscard]] bool is_any_file_locked(const fs::path& /* base */, const std::string& /* path */, std::string& /* file (relative to the base) */); [[nodiscard]] std::shared_ptr> delete_file(const fs::path& /* base */, const std::string &string); [[nodiscard]] std::shared_ptr query_directory(const fs::path& /* base */, const std::string &string, bool); #endif template std::shared_ptr> create_execute_response() { return std::make_shared>(this->result_notify_mutex, this->result_notify_cv); } std::string target_file_path(FileCategory type, ts::ServerId sid, ts::ChannelId cid, const std::string &path); std::mutex result_notify_mutex{}; std::condition_variable result_notify_cv{}; std::string root_path_{}; std::mutex locked_files_mutex{}; std::deque locked_files_{}; }; } namespace transfer { class LocalFileTransfer; struct Buffer { Buffer* next{nullptr}; size_t capacity{0}; size_t length{0}; size_t offset{0}; char data[1]{}; }; [[nodiscard]] extern Buffer* allocate_buffer(size_t); extern void free_buffer(Buffer*); struct NetworkThrottle { constexpr static auto kThrottleTimespanMs{250}; typedef uint8_t span_t; ssize_t max_bytes{0}; span_t current_index{0}; size_t bytes_send{0}; inline bool increase_bytes(size_t bytes) { auto current_ms = std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); auto current_span = (span_t) (current_ms / kThrottleTimespanMs); if(this->current_index != current_span) { this->current_index = current_span; this->bytes_send = bytes; } else { this->bytes_send += bytes; } return this->max_bytes > 0 && this->bytes_send >= this->max_bytes; } inline void set_max_bandwidth(ssize_t bytes_per_second) { if(bytes_per_second <= 0) this->max_bytes = -1; else this->max_bytes = bytes_per_second * kThrottleTimespanMs / 1000; } [[nodiscard]] inline bool should_throttle(timeval& next_timestamp) { if(this->max_bytes <= 0) return false; auto current_ms = std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); auto current_span = (span_t) (current_ms / kThrottleTimespanMs); if(this->current_index != current_span) return false; if(this->bytes_send < this->max_bytes) return false; next_timestamp.tv_usec = (kThrottleTimespanMs - current_ms % kThrottleTimespanMs) * 1000; next_timestamp.tv_sec = next_timestamp.tv_usec / 1000000; next_timestamp.tv_usec -= next_timestamp.tv_sec * 1000000; return true; } [[nodiscard]] inline size_t bytes_left() const { if(this->max_bytes <= 0) return (size_t) -1; auto current_ms = std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); auto current_span = (span_t) (current_ms / kThrottleTimespanMs); if(this->current_index != current_span) return this->max_bytes; if(this->bytes_send < this->max_bytes) return this->max_bytes - this->bytes_send; return 0; } [[nodiscard]] inline std::chrono::milliseconds expected_writing_time(size_t bytes) const { if(this->max_bytes <= 0) return std::chrono::milliseconds{0}; return std::chrono::seconds{bytes / (this->max_bytes * (1000 / kThrottleTimespanMs))}; } }; /* all variables are locked via the state_mutex */ struct FileClient : std::enable_shared_from_this { LocalFileTransfer* handle; std::shared_ptr transfer{nullptr}; std::shared_mutex state_mutex{}; enum { STATE_AWAITING_KEY, /* includes SSL/HTTP init */ STATE_TRANSFERRING, STATE_DISCONNECTING, STATE_DISCONNECTED } state{STATE_AWAITING_KEY}; enum NetworkingProtocol { PROTOCOL_UNKNOWN, PROTOCOL_HTTPS, PROTOCOL_TS_V1 }; enum HTTPUploadState { HTTP_AWAITING_HEADER, HTTP_STATE_AWAITING_BOUNDARY, HTTP_STATE_UPLOADING, HTTP_STATE_DOWNLOADING }; struct { bool file_locked{false}; std::string absolute_path{}; int file_descriptor{0}; bool currently_processing{false}; FileClient* next_client{nullptr}; } file{}; struct { size_t provided_bytes{0}; char key[TRANSFER_KEY_LENGTH]{0}; } transfer_key{}; /* will be used for both directions */ struct { std::mutex mutex{}; size_t bytes{0}; bool buffering_stopped{false}; Buffer* buffer_head{nullptr}; Buffer** buffer_tail{&buffer_head}; } buffer{}; struct { sockaddr_storage address{}; int file_descriptor{0}; NetworkingProtocol protocol{PROTOCOL_UNKNOWN}; struct event* event_read{nullptr}; struct event* event_write{nullptr}; struct event* event_throttle{nullptr}; bool add_event_write{false}, add_event_read{false}; std::chrono::system_clock::time_point disconnect_timeout{}; NetworkThrottle throttle; pipes::SSL pipe_ssl{}; bool pipe_ssl_init{false}; std::unique_ptr http_header_buffer{nullptr, free_buffer}; HTTPUploadState http_state{HTTPUploadState::HTTP_AWAITING_HEADER}; /* Only read the transfer key length at the beginning. We than have the actual limit which will be set via throttle */ size_t max_read_buffer_size{TRANSFER_KEY_LENGTH}; } networking{}; struct { size_t network_bytes_send{0}; size_t network_bytes_received{0}; size_t file_bytes_transferred{0}; /* used for delta statistics */ size_t last_network_bytes_send{0}; size_t last_network_bytes_received{0}; /* used for delta statistics */ size_t last_file_bytes_transferred{0}; size_t disk_bytes_read{0}; size_t disk_bytes_write{0}; } statistics{}; struct { std::chrono::system_clock::time_point last_write{}; std::chrono::system_clock::time_point last_read{}; std::chrono::system_clock::time_point connected{}; std::chrono::system_clock::time_point key_received{}; std::chrono::system_clock::time_point disconnecting{}; } timings; explicit FileClient(LocalFileTransfer* handle) : handle{handle} {} ~FileClient(); void add_network_write_event(bool /* ignore bandwidth limits */); void add_network_write_event_nolock(bool /* ignore bandwidth limits */); /* will check if we've enough space in out read buffer again */ void add_network_read_event(bool /* ignore bandwidth limits */); bool send_file_bytes(const void* /* buffer */, size_t /* length */); bool enqueue_buffer_bytes(const void* /* buffer */, size_t /* length */); [[nodiscard]] inline std::string log_prefix() const { return "[" + net::to_string(this->networking.address) + "]"; } }; enum struct DiskIOStartResult { SUCCESS, OUT_OF_MEMORY }; enum struct NetworkingStartResult { SUCCESS, OUT_OF_MEMORY, NO_BINDINGS }; enum struct ClientWorkerStartResult { SUCCESS }; enum struct NetworkInitializeResult { SUCCESS, OUT_OF_MEMORY }; enum struct FileInitializeResult { SUCCESS, INVALID_TRANSFER_DIRECTION, OUT_OF_MEMORY, PROCESS_FILE_LIMIT_REACHED, SYSTEM_FILE_LIMIT_REACHED, FILE_IS_BUSY, FILE_DOES_NOT_EXISTS, FILE_SYSTEM_ERROR, FILE_IS_A_DIRECTORY, FILE_TOO_LARGE, DISK_IS_READ_ONLY, FILE_SIZE_MISMATCH, FILE_SEEK_FAILED, FILE_IS_NOT_ACCESSIBLE, MAX }; constexpr static std::array kFileInitializeResultMessages{ /* SUCCESS */ "success", /* INVALID_TRANSFER_DIRECTION */ "invalid file transfer direction", /* OUT_OF_MEMORY */ "out of memory", /* PROCESS_FILE_LIMIT_REACHED */ "process file limit reached", /* SYSTEM_FILE_LIMIT_REACHED */ "system file limit reached", /* FILE_IS_BUSY */ "target file is busy", /* FILE_DOES_NOT_EXISTS */ "target file does not exists", /* FILE_SYSTEM_ERROR */ "internal file system error", /* FILE_IS_A_DIRECTORY */ "target file is a directory", /* FILE_TOO_LARGE */ "file is too large", /* DISK_IS_READ_ONLY */ "disk is in read only mode", /* FILE_SIZE_MISMATCH */ "file size mismatch", /* FILE_SEEK_FAILED */ "failed to seek to target file offset", /* FILE_IS_NOT_ACCESSIBLE */ "file is not accessible" }; enum struct TransferKeyApplyResult { SUCCESS, FILE_ERROR, UNKNOWN_KEY, INTERNAL_ERROR }; enum struct TransferUploadRawResult { MORE_DATA_TO_RECEIVE, FINISH, /* UNKNOWN ERROR */ }; enum struct TransferUploadHTTPResult { MORE_DATA_TO_RECEIVE, FINISH, BOUNDARY_MISSING, BOUNDARY_TOKEN_INVALID, BOUNDARY_INVALID, MISSING_CONTENT_TYPE, INVALID_CONTENT_TYPE /* UNKNOWN ERROR */ }; struct NetworkBinding : std::enable_shared_from_this { std::string hostname{}; sockaddr_storage address{}; int file_descriptor{-1}; struct event* accept_event{nullptr}; LocalFileTransfer* handle{nullptr}; }; class LocalFileTransfer : public AbstractProvider { public: explicit LocalFileTransfer(filesystem::LocalFileSystem&); ~LocalFileTransfer(); [[nodiscard]] bool start(const std::deque>& /* bindings */, const std::shared_ptr& /* ssl options */); void stop(); [[nodiscard]] inline const auto& ssl_options() const { return this->ssl_options_; } inline void set_ssl_options(const std::shared_ptr& options) { this->ssl_options_ = options; } std::shared_ptr>> initialize_channel_transfer(Transfer::Direction direction, ServerId id, ChannelId channelId, const TransferInfo &info) override; std::shared_ptr>> initialize_icon_transfer(Transfer::Direction direction, ServerId id, const TransferInfo &info) override; std::shared_ptr>> initialize_avatar_transfer(Transfer::Direction direction, ServerId id, const TransferInfo &info) override; std::shared_ptr> stop_transfer(transfer_id id, bool) override; private: enum struct DiskIOLoopState { STOPPED, RUNNING, STOPPING, FORCE_STOPPING }; filesystem::LocalFileSystem& file_system_; std::atomic current_transfer_id{0}; std::mt19937 transfer_random_token_generator{std::random_device{}()}; std::mutex result_notify_mutex{}; std::condition_variable result_notify_cv{}; std::mutex transfers_mutex{}; std::deque> transfers_{}; std::deque> pending_transfers{}; std::shared_ptr ssl_options_{}; enum ServerState { STOPPED, RUNNING } state{ServerState::STOPPED}; struct { bool active{false}; std::thread dispatch_thread{}; std::mutex mutex{}; std::condition_variable notify_cv{}; } disconnect; struct { bool active{false}; std::thread dispatch_thread{}; struct event_base* event_base{nullptr}; std::deque> bindings{}; } network{}; struct { DiskIOLoopState state{DiskIOLoopState::STOPPED}; std::thread dispatch_thread{}; std::mutex queue_lock{}; std::condition_variable notify_work_awaiting{}; std::condition_variable notify_client_processed{}; FileClient* queue_head{nullptr}; FileClient** queue_tail{&queue_head}; } disk_io{}; template std::shared_ptr> create_execute_response() { return std::make_shared>(this->result_notify_mutex, this->result_notify_cv); } std::shared_ptr>> initialize_transfer(Transfer::Direction, ServerId, ChannelId, Transfer::TargetType, const TransferInfo &info); [[nodiscard]] NetworkingStartResult start_networking(); [[nodiscard]] DiskIOStartResult start_disk_io(); [[nodiscard]] ClientWorkerStartResult start_client_worker(); void shutdown_networking(); void shutdown_disk_io(); void shutdown_client_worker(); void disconnect_client(const std::shared_ptr& /* client */, std::unique_lock& /* state lock */, bool /* flush */); [[nodiscard]] NetworkInitializeResult initialize_networking(const std::shared_ptr& /* client */, int /* file descriptor */); /* might block 'till all IO operations have been succeeded */ void finalize_networking(const std::shared_ptr& /* client */, std::unique_lock& /* state lock */); [[nodiscard]] FileInitializeResult initialize_file_io(const std::shared_ptr& /* client */); void finalize_file_io(const std::shared_ptr& /* client */, std::unique_lock& /* state lock */); [[nodiscard]] bool initialize_client_ssl(const std::shared_ptr& /* client */); void finalize_client_ssl(const std::shared_ptr& /* client */); void enqueue_disk_io(const std::shared_ptr& /* client */); void execute_disk_io(const std::shared_ptr& /* client */); void report_transfer_statistics(const std::shared_ptr& /* client */); [[nodiscard]] TransferUploadRawResult handle_transfer_upload_raw(const std::shared_ptr& /* client */, const char * /* buffer */, size_t /* length */); [[nodiscard]] TransferUploadHTTPResult handle_transfer_upload_http(const std::shared_ptr& /* client */, const char * /* buffer */, size_t /* length */); void send_http_response(const std::shared_ptr& /* client */, http::HttpResponse& /* response */); static void callback_transfer_network_write(int, short, void*); static void callback_transfer_network_read(int, short, void*); static void callback_transfer_network_throttle(int, short, void*); static void callback_transfer_network_accept(int, short, void*); static void dispatch_loop_client_worker(void*); static void dispatch_loop_network(void*); static void dispatch_loop_disk_io(void*); size_t handle_transfer_read(const std::shared_ptr& /* client */, const char* /* buffer */, size_t /* bytes */); size_t handle_transfer_read_raw(const std::shared_ptr& /* client */, const char* /* buffer */, size_t /* bytes */); [[nodiscard]] TransferKeyApplyResult handle_transfer_key_provided(const std::shared_ptr& /* client */, std::string& /* error */); }; } class LocalFileProvider : public AbstractFileServer { public: LocalFileProvider(); virtual ~LocalFileProvider(); [[nodiscard]] bool initialize(std::string& /* error */, const std::shared_ptr& /* ssl options */); void finalize(); filesystem::AbstractProvider &file_system() override; transfer::AbstractProvider &file_transfer() override; private: filesystem::LocalFileSystem file_system_; transfer::LocalFileTransfer file_transfer_; }; }