Fixed a bug related to the file transfer

This commit is contained in:
WolverinDEV 2020-09-16 14:15:06 +02:00
parent 2a1f0187ac
commit 38d6ad4920
11 changed files with 674 additions and 584 deletions

View File

@ -5,6 +5,8 @@
#include <netinet/in.h>
#include <log/LogUtils.h>
#include "LocalFileProvider.h"
#include "LocalFileSystem.h"
#include "LocalFileTransfer.h"
using namespace ts::server;
using LocalFileServer = file::LocalFileProvider;
@ -72,36 +74,42 @@ std::shared_ptr<file::AbstractFileServer> file::server() {
return server_instance;
}
LocalFileServer::LocalFileProvider() : file_system_{}, file_transfer_{this->file_system_} {}
LocalFileServer::~LocalFileProvider() = default;
LocalFileServer::LocalFileProvider() {
this->file_system_ = new filesystem::LocalFileSystem();
this->file_transfer_ = new transfer::LocalFileTransfer(this->file_system_);
}
LocalFileServer::~LocalFileProvider() {
delete this->file_transfer_;
delete this->file_system_;
};
bool LocalFileServer::initialize(std::string &error) {
if(!this->file_system_.initialize(error, "files/"))
if(!this->file_system_->initialize(error, "files/"))
return false;
if(!this->file_transfer_.start()) {
if(!this->file_transfer_->start()) {
error = "transfer server startup failed";
this->file_system_.finalize();
this->file_system_->finalize();
return false;
}
return true;
}
void LocalFileServer::finalize() {
this->file_transfer_.stop();
this->file_system_.finalize();
this->file_transfer_->stop();
this->file_system_->finalize();
}
file::filesystem::AbstractProvider &LocalFileServer::file_system() {
return this->file_system_;
return *this->file_system_;
}
file::transfer::AbstractProvider &LocalFileServer::file_transfer() {
return this->file_transfer_;
return *this->file_transfer_;
}
std::string file::LocalFileProvider::file_base_path() const {
return this->file_system_.root_path();
return this->file_system_->root_path();
}
std::shared_ptr<file::VirtualFileServer> LocalFileServer::register_server(ServerId server_id) {

View File

@ -14,526 +14,9 @@
#include <misc/memtracker.h>
#include "./NetTools.h"
#define TRANSFER_MAX_CACHED_BYTES (1024 * 1024 * 1) // Buffer up to 1mb
namespace ts::server::file {
namespace filesystem {
#ifdef FS_INCLUDED
namespace fs = std::experimental::filesystem;
#endif
class LocalFileSystem : public filesystem::AbstractProvider {
using FileModifyError = filesystem::FileModifyError;
using DirectoryModifyError = filesystem::DirectoryModifyError;
public:
enum struct FileCategory {
ICON,
AVATAR,
CHANNEL
};
virtual ~LocalFileSystem();
bool initialize(std::string & /* error */, const std::string & /* root path */);
void finalize();
void lock_file(const std::string& /* absolute path */);
void unlock_file(const std::string& /* absolute path */);
[[nodiscard]] inline const auto &root_path() const { return this->root_path_; }
[[nodiscard]] std::string absolute_avatar_path(const std::shared_ptr<VirtualFileServer> &, const std::string&);
[[nodiscard]] std::string absolute_icon_path(const std::shared_ptr<VirtualFileServer> &, const std::string&);
[[nodiscard]] std::string absolute_channel_path(const std::shared_ptr<VirtualFileServer> &, ChannelId, const std::string&);
std::shared_ptr<ExecuteResponse<ServerCommandError>> initialize_server(const std::shared_ptr<VirtualFileServer> & /* server */) override;
std::shared_ptr<ExecuteResponse<ServerCommandError>> delete_server(const std::shared_ptr<VirtualFileServer> & /* server */) override;
std::shared_ptr<ExecuteResponse<FileInfoError, FileInfoResponse>>
query_channel_info(const std::shared_ptr<VirtualFileServer> & /* server */, const std::vector<std::tuple<ChannelId, std::string>>& /* names */) override;
std::shared_ptr<directory_query_response_t>
query_channel_directory(const std::shared_ptr<VirtualFileServer> & id, ChannelId channelId, const std::string &string) override;
std::shared_ptr<ExecuteResponse<DirectoryModifyError>>
create_channel_directory(const std::shared_ptr<VirtualFileServer> & id, ChannelId channelId, const std::string &string) override;
std::shared_ptr<ExecuteResponse<FileDeleteError, FileDeleteResponse>>
delete_channel_files(const std::shared_ptr<VirtualFileServer> & id, ChannelId channelId, const std::vector<std::string> &string) override;
std::shared_ptr<ExecuteResponse<FileModifyError>>
rename_channel_file(const std::shared_ptr<VirtualFileServer> & id, ChannelId channelId, const std::string &, ChannelId, const std::string &) override;
std::shared_ptr<ExecuteResponse<FileInfoError, FileInfoResponse>>
query_icon_info(const std::shared_ptr<VirtualFileServer> & /* server */, const std::vector<std::string>& /* names */) override;
std::shared_ptr<directory_query_response_t> query_icon_directory(const std::shared_ptr<VirtualFileServer> & id) override;
std::shared_ptr<ExecuteResponse<FileDeleteError, FileDeleteResponse>>
delete_icons(const std::shared_ptr<VirtualFileServer> & id, const std::vector<std::string> &string) override;
std::shared_ptr<ExecuteResponse<FileInfoError, FileInfoResponse>>
query_avatar_info(const std::shared_ptr<VirtualFileServer> & /* server */, const std::vector<std::string>& /* names */) override;
std::shared_ptr<directory_query_response_t> query_avatar_directory(const std::shared_ptr<VirtualFileServer> & id) override;
std::shared_ptr<ExecuteResponse<FileDeleteError, FileDeleteResponse>>
delete_avatars(const std::shared_ptr<VirtualFileServer> & id, const std::vector<std::string> &string) override;
private:
#ifdef FS_INCLUDED
[[nodiscard]] fs::path server_path(const std::shared_ptr<VirtualFileServer> &);
[[nodiscard]] fs::path server_channel_path(const std::shared_ptr<VirtualFileServer> &, ChannelId);
[[nodiscard]] static bool exceeds_base_path(const fs::path& /* base */, const fs::path& /* target */);
[[nodiscard]] bool is_any_file_locked(const fs::path& /* base */, const std::string& /* path */, std::string& /* file (relative to the base) */);
[[nodiscard]] std::shared_ptr<ExecuteResponse<FileDeleteError, FileDeleteResponse>>
delete_files(const fs::path& /* base */, const std::vector<std::string> &string);
[[nodiscard]] std::shared_ptr<directory_query_response_t>
query_directory(const fs::path& /* base */, const std::string &string, bool);
[[nodiscard]] std::shared_ptr<ExecuteResponse<FileInfoError, FileInfoResponse>>
query_file_info(const std::vector<std::tuple<fs::path, std::string>> &string);
#endif
template <typename error_t, typename result_t = EmptyExecuteResponse>
std::shared_ptr<ExecuteResponse<error_t, result_t>> create_execute_response() {
return std::make_shared<ExecuteResponse<error_t, result_t>>(this->result_notify_mutex, this->result_notify_cv);
}
std::string target_file_path(FileCategory type, const std::shared_ptr<VirtualFileServer> &sid, ts::ChannelId cid, const std::string &path);
std::mutex result_notify_mutex{};
std::condition_variable result_notify_cv{};
std::string root_path_{};
std::mutex locked_files_mutex{};
std::deque<std::string> locked_files_{};
};
}
namespace transfer {
class LocalFileTransfer;
struct Buffer {
Buffer* next{nullptr};
size_t capacity{0};
size_t length{0};
size_t offset{0};
char data[1]{};
};
[[nodiscard]] extern Buffer* allocate_buffer(size_t);
extern void free_buffer(Buffer*);
/* all variables are locked via the state_mutex */
struct FileClient : std::enable_shared_from_this<FileClient> {
LocalFileTransfer* handle;
std::shared_ptr<Transfer> transfer{nullptr};
std::shared_mutex state_mutex{};
enum {
STATE_AWAITING_KEY, /* includes SSL/HTTP init */
STATE_TRANSFERRING,
STATE_FLUSHING,
STATE_DISCONNECTED
} state{STATE_AWAITING_KEY};
bool finished_signal_send{false};
enum NetworkingProtocol {
PROTOCOL_UNKNOWN,
PROTOCOL_HTTPS,
PROTOCOL_TS_V1
};
enum HTTPUploadState {
HTTP_AWAITING_HEADER,
HTTP_STATE_AWAITING_BOUNDARY,
HTTP_STATE_AWAITING_BOUNDARY_END,
HTTP_STATE_UPLOADING,
HTTP_STATE_DOWNLOADING
};
struct {
bool file_locked{false};
int file_descriptor{0};
bool currently_processing{false};
FileClient* next_client{nullptr};
bool query_media_bytes{false};
uint8_t media_bytes[TRANSFER_MEDIA_BYTES_LENGTH]{0};
uint8_t media_bytes_length{0};
} file{};
struct {
size_t provided_bytes{0};
char key[TRANSFER_KEY_LENGTH]{0};
} transfer_key{};
struct {
std::mutex mutex{};
size_t bytes{0};
bool buffering_stopped{false};
bool write_disconnected{false};
Buffer* buffer_head{nullptr};
Buffer** buffer_tail{&buffer_head};
} network_buffer{};
struct {
std::mutex mutex{};
size_t bytes{0};
bool buffering_stopped{false};
bool write_disconnected{false};
Buffer* buffer_head{nullptr};
Buffer** buffer_tail{&buffer_head};
} disk_buffer{};
struct {
sockaddr_storage address{};
int file_descriptor{0};
NetworkingProtocol protocol{PROTOCOL_UNKNOWN};
struct event* event_read{nullptr};
struct event* event_write{nullptr};
struct event* event_throttle{nullptr};
bool add_event_write{false}, add_event_read{false};
std::chrono::system_clock::time_point disconnect_timeout{};
networking::NetworkThrottle client_throttle{};
/* the right side is the server throttle */
networking::DualNetworkThrottle throttle{&client_throttle, &networking::NetworkThrottle::kNoThrottle};
pipes::SSL pipe_ssl{};
bool pipe_ssl_init{false};
std::unique_ptr<Buffer, decltype(free_buffer)*> http_header_buffer{nullptr, free_buffer};
HTTPUploadState http_state{HTTPUploadState::HTTP_AWAITING_HEADER};
std::string http_boundary{};
/* Only read the transfer key length at the beginning. We than have the actual limit which will be set via throttle */
size_t max_read_buffer_size{TRANSFER_KEY_LENGTH};
} networking{};
struct {
networking::TransferStatistics network_send{};
networking::TransferStatistics network_received{};
networking::TransferStatistics file_transferred{};
networking::TransferStatistics disk_bytes_read{};
networking::TransferStatistics disk_bytes_write{};
} statistics{};
struct {
std::chrono::system_clock::time_point last_write{};
std::chrono::system_clock::time_point last_read{};
std::chrono::system_clock::time_point connected{};
std::chrono::system_clock::time_point key_received{};
std::chrono::system_clock::time_point disconnecting{};
} timings;
explicit FileClient(LocalFileTransfer* handle) : handle{handle} { memtrack::allocated<FileClient>(this); }
~FileClient();
void add_network_write_event(bool /* ignore bandwidth limits */);
void add_network_write_event_nolock(bool /* ignore bandwidth limits */);
/* will check if we've enough space in out read buffer again */
void add_network_read_event(bool /* ignore bandwidth limits */);
bool send_file_bytes(const void* /* buffer */, size_t /* length */);
bool enqueue_network_buffer_bytes(const void* /* buffer */, size_t /* length */);
bool enqueue_disk_buffer_bytes(const void* /* buffer */, size_t /* length */);
/* these function clear the buffers and set the write disconnected flags to true so no new buffers will be enqueued */
size_t flush_network_buffer();
void flush_disk_buffer();
[[nodiscard]] bool buffers_flushed();
[[nodiscard]] inline std::string log_prefix() const { return "[" + net::to_string(this->networking.address) + "]"; }
};
enum struct DiskIOStartResult {
SUCCESS,
OUT_OF_MEMORY
};
enum struct NetworkingStartResult {
SUCCESS,
OUT_OF_MEMORY
};
enum struct NetworkingBindResult {
SUCCESS,
BINDING_ALREADY_EXISTS,
NETWORKING_NOT_INITIALIZED,
FAILED_TO_ALLOCATE_SOCKET, /* errno is set */
FAILED_TO_BIND,
FAILED_TO_LISTEN,
OUT_OF_MEMORY,
};
enum struct NetworkingUnbindResult {
SUCCESS,
UNKNOWN_BINDING
};
enum struct ClientWorkerStartResult {
SUCCESS
};
enum struct NetworkInitializeResult {
SUCCESS,
OUT_OF_MEMORY
};
enum struct FileInitializeResult {
SUCCESS,
INVALID_TRANSFER_DIRECTION,
OUT_OF_MEMORY,
PROCESS_FILE_LIMIT_REACHED,
SYSTEM_FILE_LIMIT_REACHED,
FILE_IS_BUSY,
FILE_DOES_NOT_EXISTS,
FILE_SYSTEM_ERROR,
FILE_IS_A_DIRECTORY,
FILE_TOO_LARGE,
DISK_IS_READ_ONLY,
FILE_SEEK_FAILED,
FILE_SIZE_MISMATCH,
FILE_IS_NOT_ACCESSIBLE,
FAILED_TO_READ_MEDIA_BYTES,
COUNT_NOT_CREATE_DIRECTORIES,
MAX
};
constexpr static std::array<std::string_view, (size_t) FileInitializeResult::MAX> kFileInitializeResultMessages{
/* SUCCESS */ "success",
/* INVALID_TRANSFER_DIRECTION */ "invalid file transfer direction",
/* OUT_OF_MEMORY */ "out of memory",
/* PROCESS_FILE_LIMIT_REACHED */ "process file limit reached",
/* SYSTEM_FILE_LIMIT_REACHED */ "system file limit reached",
/* FILE_IS_BUSY */ "target file is busy",
/* FILE_DOES_NOT_EXISTS */ "target file does not exists",
/* FILE_SYSTEM_ERROR */ "internal file system error",
/* FILE_IS_A_DIRECTORY */ "target file is a directory",
/* FILE_TOO_LARGE */ "file is too large",
/* DISK_IS_READ_ONLY */ "disk is in read only mode",
/* FILE_SEEK_FAILED */ "failed to seek to target file offset",
/* FILE_SIZE_MISMATCH */ "file size miss match",
/* FILE_IS_NOT_ACCESSIBLE */ "file is not accessible",
/* FAILED_TO_READ_MEDIA_BYTES */ "failed to read file media bytes",
/* COUNT_NOT_CREATE_DIRECTORIES */ "could not create required directories"
};
enum struct TransferKeyApplyResult {
SUCCESS,
FILE_ERROR,
UNKNOWN_KEY,
INTERNAL_ERROR
};
enum struct TransferUploadRawResult {
MORE_DATA_TO_RECEIVE,
FINISH,
FINISH_OVERFLOW,
/* UNKNOWN ERROR */
};
enum struct TransferUploadHTTPResult {
MORE_DATA_TO_RECEIVE,
FINISH,
BOUNDARY_MISSING,
BOUNDARY_TOKEN_INVALID,
BOUNDARY_INVALID,
MISSING_CONTENT_TYPE,
INVALID_CONTENT_TYPE
/* UNKNOWN ERROR */
};
struct NetworkBinding {
std::string hostname{};
sockaddr_storage address{};
};
struct ActiveNetworkBinding : std::enable_shared_from_this<ActiveNetworkBinding> {
std::string hostname{};
sockaddr_storage address{};
int file_descriptor{-1};
struct event* accept_event{nullptr};
LocalFileTransfer* handle{nullptr};
};
class LocalFileTransfer : public AbstractProvider {
public:
explicit LocalFileTransfer(filesystem::LocalFileSystem&);
~LocalFileTransfer();
[[nodiscard]] bool start();
void stop();
[[nodiscard]] NetworkingBindResult add_network_binding(const NetworkBinding& /* binding */);
[[nodiscard]] std::vector<NetworkBinding> active_network_bindings();
[[nodiscard]] NetworkingUnbindResult remove_network_binding(const NetworkBinding& /* binding */);
std::shared_ptr<ExecuteResponse<TransferInitError, std::shared_ptr<Transfer>>>
initialize_channel_transfer(Transfer::Direction direction, const std::shared_ptr<VirtualFileServer>& server, ChannelId channelId,
const TransferInfo &info) override;
std::shared_ptr<ExecuteResponse<TransferInitError, std::shared_ptr<Transfer>>>
initialize_icon_transfer(Transfer::Direction direction, const std::shared_ptr<VirtualFileServer>& server, const TransferInfo &info) override;
std::shared_ptr<ExecuteResponse<TransferInitError, std::shared_ptr<Transfer>>>
initialize_avatar_transfer(Transfer::Direction direction, const std::shared_ptr<VirtualFileServer>& server, const TransferInfo &info) override;
std::shared_ptr<ExecuteResponse<TransferListError, std::vector<ActiveFileTransfer>>> list_transfer() override;
std::shared_ptr<ExecuteResponse<TransferActionError>> stop_transfer(const std::shared_ptr<VirtualFileServer>& /* server */, transfer_id id, bool) override;
private:
enum struct DiskIOLoopState {
STOPPED,
RUNNING,
STOPPING,
FORCE_STOPPING
};
filesystem::LocalFileSystem& file_system_;
size_t max_concurrent_transfers{1024};
std::mt19937 transfer_random_token_generator{std::random_device{}()};
std::mutex result_notify_mutex{};
std::condition_variable result_notify_cv{};
std::mutex transfers_mutex{};
std::mutex transfer_create_mutex{};
std::deque<std::shared_ptr<FileClient>> transfers_{};
std::deque<std::shared_ptr<Transfer>> pending_transfers{};
enum ServerState {
STOPPED,
RUNNING
} state{ServerState::STOPPED};
struct {
bool active{false};
std::thread dispatch_thread{};
std::mutex mutex{};
std::condition_variable notify_cv{};
} disconnect;
struct {
std::mutex mutex;
bool active{false};
std::thread dispatch_thread{};
struct event_base* event_base{nullptr};
std::deque<std::shared_ptr<ActiveNetworkBinding>> bindings{};
} network{};
struct {
DiskIOLoopState state{DiskIOLoopState::STOPPED};
std::thread dispatch_thread{};
std::mutex queue_lock{};
std::condition_variable notify_work_awaiting{};
std::condition_variable notify_client_processed{};
FileClient* queue_head{nullptr};
FileClient** queue_tail{&queue_head};
} disk_io{};
template <typename error_t, typename result_t = EmptyExecuteResponse>
std::shared_ptr<ExecuteResponse<error_t, result_t>> create_execute_response() {
return std::make_shared<ExecuteResponse<error_t, result_t>>(this->result_notify_mutex, this->result_notify_cv);
}
std::shared_ptr<ExecuteResponse<TransferInitError, std::shared_ptr<Transfer>>>
initialize_transfer(Transfer::Direction, const std::shared_ptr<VirtualFileServer> &, ChannelId, Transfer::TargetType, const TransferInfo &info);
[[nodiscard]] NetworkingStartResult start_networking();
[[nodiscard]] DiskIOStartResult start_disk_io();
[[nodiscard]] ClientWorkerStartResult start_client_worker();
void shutdown_networking();
void shutdown_disk_io();
void shutdown_client_worker();
void disconnect_client(const std::shared_ptr<FileClient>& /* client */, std::unique_lock<std::shared_mutex>& /* state lock */, bool /* flush network */);
[[nodiscard]] NetworkInitializeResult initialize_networking(const std::shared_ptr<FileClient>& /* client */, int /* file descriptor */);
/* might block 'till all IO operations have been succeeded */
void finalize_networking(const std::shared_ptr<FileClient>& /* client */, std::unique_lock<std::shared_mutex>& /* state lock */);
[[nodiscard]] FileInitializeResult initialize_file_io(const std::shared_ptr<FileClient>& /* client */);
void finalize_file_io(const std::shared_ptr<FileClient>& /* client */, std::unique_lock<std::shared_mutex>& /* state lock */);
[[nodiscard]] bool initialize_client_ssl(const std::shared_ptr<FileClient>& /* client */);
void finalize_client_ssl(const std::shared_ptr<FileClient>& /* client */);
void enqueue_disk_io(const std::shared_ptr<FileClient>& /* client */);
void execute_disk_io(const std::shared_ptr<FileClient>& /* client */);
void test_disconnecting_state(const std::shared_ptr<FileClient>& /* client */);
[[nodiscard]] TransferUploadRawResult handle_transfer_upload_raw(const std::shared_ptr<FileClient>& /* client */, const char * /* buffer */, size_t /* length */, size_t* /* bytes written */);
[[nodiscard]] TransferUploadHTTPResult handle_transfer_upload_http(const std::shared_ptr<FileClient>& /* client */, const char * /* buffer */, size_t /* length */);
void send_http_response(const std::shared_ptr<FileClient>& /* client */, http::HttpResponse& /* response */);
static void callback_transfer_network_write(int, short, void*);
static void callback_transfer_network_read(int, short, void*);
static void callback_transfer_network_throttle(int, short, void*);
static void callback_transfer_network_accept(int, short, void*);
static void dispatch_loop_client_worker(void*);
static void dispatch_loop_network(void*);
static void dispatch_loop_disk_io(void*);
void report_transfer_statistics(const std::shared_ptr<FileClient>& /* client */);
[[nodiscard]] TransferStatistics generate_transfer_statistics_report(const std::shared_ptr<FileClient>& /* client */);
void invoke_aborted_callback(const std::shared_ptr<FileClient>& /* client */, const TransferError& /* error */);
void invoke_aborted_callback(const std::shared_ptr<Transfer>& /* pending transfer */, const TransferError& /* error */);
size_t handle_transfer_read(const std::shared_ptr<FileClient>& /* client */, const char* /* buffer */, size_t /* bytes */);
size_t handle_transfer_read_raw(const std::shared_ptr<FileClient>& /* client */, const char* /* buffer */, size_t /* bytes */);
[[nodiscard]] TransferKeyApplyResult handle_transfer_key_provided(const std::shared_ptr<FileClient>& /* client */, std::string& /* error */);
};
}
namespace filesystem { class LocalFileSystem; }
namespace transfer { class LocalFileTransfer; }
class LocalVirtualFileServer : public VirtualFileServer {
public:
@ -563,7 +46,7 @@ namespace ts::server::file {
std::shared_ptr<VirtualFileServer> register_server(ServerId /* server id */) override;
void unregister_server(ServerId /* server id */) override;
private:
filesystem::LocalFileSystem file_system_;
transfer::LocalFileTransfer file_transfer_;
filesystem::LocalFileSystem* file_system_;
transfer::LocalFileTransfer* file_transfer_;
};
}

View File

@ -5,8 +5,8 @@
#define FS_INCLUDED
#include <log/LogUtils.h>
#include "LocalFileProvider.h"
#include "clnpath.h"
#include "./LocalFileSystem.h"
#include "./clnpath.h"
using namespace ts::server::file;
using namespace ts::server::file::filesystem;

View File

@ -0,0 +1,117 @@
//
// Created by WolverinDEV on 16/09/2020.
//
#pragma once
#pragma once
#include <files/FileServer.h>
#include <deque>
#include <utility>
#include <thread>
#include <shared_mutex>
#include <sys/socket.h>
#include <pipes/ws.h>
#include <pipes/ssl.h>
#include <misc/net.h>
#include <misc/spin_mutex.h>
#include <random>
#include <misc/memtracker.h>
#include "./NetTools.h"
namespace ts::server::file::filesystem {
#ifdef FS_INCLUDED
namespace fs = std::experimental::filesystem;
#endif
class LocalFileSystem : public filesystem::AbstractProvider {
using FileModifyError = filesystem::FileModifyError;
using DirectoryModifyError = filesystem::DirectoryModifyError;
public:
enum struct FileCategory {
ICON,
AVATAR,
CHANNEL
};
virtual ~LocalFileSystem();
bool initialize(std::string & /* error */, const std::string & /* root path */);
void finalize();
void lock_file(const std::string& /* absolute path */);
void unlock_file(const std::string& /* absolute path */);
[[nodiscard]] inline const auto &root_path() const { return this->root_path_; }
[[nodiscard]] std::string absolute_avatar_path(const std::shared_ptr<VirtualFileServer> &, const std::string&);
[[nodiscard]] std::string absolute_icon_path(const std::shared_ptr<VirtualFileServer> &, const std::string&);
[[nodiscard]] std::string absolute_channel_path(const std::shared_ptr<VirtualFileServer> &, ChannelId, const std::string&);
std::shared_ptr<ExecuteResponse<ServerCommandError>> initialize_server(const std::shared_ptr<VirtualFileServer> & /* server */) override;
std::shared_ptr<ExecuteResponse<ServerCommandError>> delete_server(const std::shared_ptr<VirtualFileServer> & /* server */) override;
std::shared_ptr<ExecuteResponse<FileInfoError, FileInfoResponse>>
query_channel_info(const std::shared_ptr<VirtualFileServer> & /* server */, const std::vector<std::tuple<ChannelId, std::string>>& /* names */) override;
std::shared_ptr<directory_query_response_t>
query_channel_directory(const std::shared_ptr<VirtualFileServer> & id, ChannelId channelId, const std::string &string) override;
std::shared_ptr<ExecuteResponse<DirectoryModifyError>>
create_channel_directory(const std::shared_ptr<VirtualFileServer> & id, ChannelId channelId, const std::string &string) override;
std::shared_ptr<ExecuteResponse<FileDeleteError, FileDeleteResponse>>
delete_channel_files(const std::shared_ptr<VirtualFileServer> & id, ChannelId channelId, const std::vector<std::string> &string) override;
std::shared_ptr<ExecuteResponse<FileModifyError>>
rename_channel_file(const std::shared_ptr<VirtualFileServer> & id, ChannelId channelId, const std::string &, ChannelId, const std::string &) override;
std::shared_ptr<ExecuteResponse<FileInfoError, FileInfoResponse>>
query_icon_info(const std::shared_ptr<VirtualFileServer> & /* server */, const std::vector<std::string>& /* names */) override;
std::shared_ptr<directory_query_response_t> query_icon_directory(const std::shared_ptr<VirtualFileServer> & id) override;
std::shared_ptr<ExecuteResponse<FileDeleteError, FileDeleteResponse>>
delete_icons(const std::shared_ptr<VirtualFileServer> & id, const std::vector<std::string> &string) override;
std::shared_ptr<ExecuteResponse<FileInfoError, FileInfoResponse>>
query_avatar_info(const std::shared_ptr<VirtualFileServer> & /* server */, const std::vector<std::string>& /* names */) override;
std::shared_ptr<directory_query_response_t> query_avatar_directory(const std::shared_ptr<VirtualFileServer> & id) override;
std::shared_ptr<ExecuteResponse<FileDeleteError, FileDeleteResponse>>
delete_avatars(const std::shared_ptr<VirtualFileServer> & id, const std::vector<std::string> &string) override;
private:
#ifdef FS_INCLUDED
[[nodiscard]] fs::path server_path(const std::shared_ptr<VirtualFileServer> &);
[[nodiscard]] fs::path server_channel_path(const std::shared_ptr<VirtualFileServer> &, ChannelId);
[[nodiscard]] static bool exceeds_base_path(const fs::path& /* base */, const fs::path& /* target */);
[[nodiscard]] bool is_any_file_locked(const fs::path& /* base */, const std::string& /* path */, std::string& /* file (relative to the base) */);
[[nodiscard]] std::shared_ptr<ExecuteResponse<FileDeleteError, FileDeleteResponse>>
delete_files(const fs::path& /* base */, const std::vector<std::string> &string);
[[nodiscard]] std::shared_ptr<directory_query_response_t>
query_directory(const fs::path& /* base */, const std::string &string, bool);
[[nodiscard]] std::shared_ptr<ExecuteResponse<FileInfoError, FileInfoResponse>>
query_file_info(const std::vector<std::tuple<fs::path, std::string>> &string);
#endif
template <typename error_t, typename result_t = EmptyExecuteResponse>
std::shared_ptr<ExecuteResponse<error_t, result_t>> create_execute_response() {
return std::make_shared<ExecuteResponse<error_t, result_t>>(this->result_notify_mutex, this->result_notify_cv);
}
std::string target_file_path(FileCategory type, const std::shared_ptr<VirtualFileServer> &sid, ts::ChannelId cid, const std::string &path);
std::mutex result_notify_mutex{};
std::condition_variable result_notify_cv{};
std::string root_path_{};
std::mutex locked_files_mutex{};
std::deque<std::string> locked_files_{};
};
}

View File

@ -7,6 +7,7 @@
#include <log/LogUtils.h>
#include <random>
#include "./LocalFileProvider.h"
#include "./LocalFileTransfer.h"
#include <experimental/filesystem>
namespace fs = std::experimental::filesystem;
@ -44,7 +45,7 @@ FileClient::~FileClient() {
memtrack::freed<FileClient>(this);
}
LocalFileTransfer::LocalFileTransfer(filesystem::LocalFileSystem &fs) : file_system_{fs} {}
LocalFileTransfer::LocalFileTransfer(filesystem::LocalFileSystem *fs) : file_system_{fs} {}
LocalFileTransfer::~LocalFileTransfer() = default;
bool LocalFileTransfer::start() {
@ -184,13 +185,13 @@ std::shared_ptr<ExecuteResponse<TransferInitError, std::shared_ptr<Transfer>>> L
std::string absolute_path{};
switch (transfer->target_type) {
case Transfer::TARGET_TYPE_AVATAR:
absolute_path = this->file_system_.absolute_avatar_path(transfer->server, transfer->target_file_path);
absolute_path = this->file_system_->absolute_avatar_path(transfer->server, transfer->target_file_path);
break;
case Transfer::TARGET_TYPE_ICON:
absolute_path = this->file_system_.absolute_icon_path(transfer->server, transfer->target_file_path);
absolute_path = this->file_system_->absolute_icon_path(transfer->server, transfer->target_file_path);
break;
case Transfer::TARGET_TYPE_CHANNEL_FILE:
absolute_path = this->file_system_.absolute_channel_path(transfer->server, transfer->channel_id, transfer->target_file_path);
absolute_path = this->file_system_->absolute_channel_path(transfer->server, transfer->channel_id, transfer->target_file_path);
break;
case Transfer::TARGET_TYPE_UNKNOWN:
default:
@ -199,7 +200,7 @@ std::shared_ptr<ExecuteResponse<TransferInitError, std::shared_ptr<Transfer>>> L
}
transfer->absolute_file_path = absolute_path;
const auto root_path_length = this->file_system_.root_path().size();
const auto root_path_length = this->file_system_->root_path().size();
if(root_path_length < absolute_path.size())
transfer->relative_file_path = absolute_path.substr(root_path_length);
else

View File

@ -0,0 +1,446 @@
//
// Created by WolverinDEV on 16/09/2020.
//
#pragma once
#pragma once
#include <files/FileServer.h>
#include <deque>
#include <utility>
#include <thread>
#include <shared_mutex>
#include <sys/socket.h>
#include <pipes/ws.h>
#include <pipes/ssl.h>
#include <misc/net.h>
#include <misc/spin_mutex.h>
#include <random>
#include <misc/memtracker.h>
#include "./NetTools.h"
#include "LocalFileSystem.h"
#define TRANSFER_MAX_CACHED_BYTES (1024 * 1024 * 1) // Buffer up to 1mb
namespace ts::server::file::transfer {
class LocalFileTransfer;
struct Buffer {
Buffer* next{nullptr};
size_t capacity{0};
size_t length{0};
size_t offset{0};
char data[1]{};
};
[[nodiscard]] extern Buffer* allocate_buffer(size_t);
extern void free_buffer(Buffer*);
/* all variables are locked via the state_mutex */
struct FileClient : std::enable_shared_from_this<FileClient> {
LocalFileTransfer* handle;
std::shared_ptr<Transfer> transfer{nullptr};
std::shared_mutex state_mutex{};
enum {
STATE_AWAITING_KEY, /* includes SSL/HTTP init */
STATE_TRANSFERRING,
STATE_FLUSHING,
STATE_DISCONNECTED
} state{STATE_AWAITING_KEY};
bool finished_signal_send{false};
enum NetworkingProtocol {
PROTOCOL_UNKNOWN,
PROTOCOL_HTTPS,
PROTOCOL_TS_V1
};
enum HTTPUploadState {
HTTP_AWAITING_HEADER,
HTTP_STATE_AWAITING_BOUNDARY,
HTTP_STATE_AWAITING_BOUNDARY_END,
HTTP_STATE_UPLOADING,
HTTP_STATE_DOWNLOADING
};
struct {
bool file_locked{false};
int file_descriptor{0};
bool currently_processing{false};
FileClient* next_client{nullptr};
bool query_media_bytes{false};
uint8_t media_bytes[TRANSFER_MEDIA_BYTES_LENGTH]{0};
uint8_t media_bytes_length{0};
} file{};
struct {
size_t provided_bytes{0};
char key[TRANSFER_KEY_LENGTH]{0};
} transfer_key{};
struct {
std::mutex mutex{};
size_t bytes{0};
bool buffering_stopped{false};
bool write_disconnected{false};
Buffer* buffer_head{nullptr};
Buffer** buffer_tail{&buffer_head};
} network_buffer{};
struct {
std::mutex mutex{};
size_t bytes{0};
bool buffering_stopped{false};
bool write_disconnected{false};
Buffer* buffer_head{nullptr};
Buffer** buffer_tail{&buffer_head};
} disk_buffer{};
struct {
sockaddr_storage address{};
int file_descriptor{0};
NetworkingProtocol protocol{PROTOCOL_UNKNOWN};
struct event* event_read{nullptr};
struct event* event_write{nullptr};
struct event* event_throttle{nullptr};
bool add_event_write{false}, add_event_read{false};
std::chrono::system_clock::time_point disconnect_timeout{};
networking::NetworkThrottle client_throttle{};
/* the right side is the server throttle */
networking::DualNetworkThrottle throttle{&client_throttle, &networking::NetworkThrottle::kNoThrottle};
pipes::SSL pipe_ssl{};
bool pipe_ssl_init{false};
std::unique_ptr<Buffer, decltype(free_buffer)*> http_header_buffer{nullptr, free_buffer};
HTTPUploadState http_state{HTTPUploadState::HTTP_AWAITING_HEADER};
std::string http_boundary{};
/* Only read the transfer key length at the beginning. We than have the actual limit which will be set via throttle */
size_t max_read_buffer_size{TRANSFER_KEY_LENGTH};
} networking{};
struct {
networking::TransferStatistics network_send{};
networking::TransferStatistics network_received{};
networking::TransferStatistics file_transferred{};
networking::TransferStatistics disk_bytes_read{};
networking::TransferStatistics disk_bytes_write{};
} statistics{};
struct {
std::chrono::system_clock::time_point last_write{};
std::chrono::system_clock::time_point last_read{};
std::chrono::system_clock::time_point connected{};
std::chrono::system_clock::time_point key_received{};
std::chrono::system_clock::time_point disconnecting{};
} timings;
explicit FileClient(LocalFileTransfer* handle) : handle{handle} { memtrack::allocated<FileClient>(this); }
~FileClient();
void add_network_write_event(bool /* ignore bandwidth limits */);
void add_network_write_event_nolock(bool /* ignore bandwidth limits */);
/* will check if we've enough space in out read buffer again */
void add_network_read_event(bool /* ignore bandwidth limits */);
bool send_file_bytes(const void* /* buffer */, size_t /* length */);
bool enqueue_network_buffer_bytes(const void* /* buffer */, size_t /* length */);
bool enqueue_disk_buffer_bytes(const void* /* buffer */, size_t /* length */);
/* these function clear the buffers and set the write disconnected flags to true so no new buffers will be enqueued */
size_t flush_network_buffer();
void flush_disk_buffer();
[[nodiscard]] bool buffers_flushed();
[[nodiscard]] inline std::string log_prefix() const { return "[" + net::to_string(this->networking.address) + "]"; }
};
enum struct DiskIOStartResult {
SUCCESS,
OUT_OF_MEMORY
};
enum struct NetworkingStartResult {
SUCCESS,
OUT_OF_MEMORY
};
enum struct NetworkingBindResult {
SUCCESS,
BINDING_ALREADY_EXISTS,
NETWORKING_NOT_INITIALIZED,
FAILED_TO_ALLOCATE_SOCKET, /* errno is set */
FAILED_TO_BIND,
FAILED_TO_LISTEN,
OUT_OF_MEMORY,
};
enum struct NetworkingUnbindResult {
SUCCESS,
UNKNOWN_BINDING
};
enum struct ClientWorkerStartResult {
SUCCESS
};
enum struct NetworkInitializeResult {
SUCCESS,
OUT_OF_MEMORY
};
enum struct FileInitializeResult {
SUCCESS,
INVALID_TRANSFER_DIRECTION,
OUT_OF_MEMORY,
PROCESS_FILE_LIMIT_REACHED,
SYSTEM_FILE_LIMIT_REACHED,
FILE_IS_BUSY,
FILE_DOES_NOT_EXISTS,
FILE_SYSTEM_ERROR,
FILE_IS_A_DIRECTORY,
FILE_TOO_LARGE,
DISK_IS_READ_ONLY,
FILE_SEEK_FAILED,
FILE_SIZE_MISMATCH,
FILE_IS_NOT_ACCESSIBLE,
FAILED_TO_READ_MEDIA_BYTES,
COUNT_NOT_CREATE_DIRECTORIES,
MAX
};
constexpr static std::array<std::string_view, (size_t) FileInitializeResult::MAX> kFileInitializeResultMessages{
/* SUCCESS */ "success",
/* INVALID_TRANSFER_DIRECTION */ "invalid file transfer direction",
/* OUT_OF_MEMORY */ "out of memory",
/* PROCESS_FILE_LIMIT_REACHED */ "process file limit reached",
/* SYSTEM_FILE_LIMIT_REACHED */ "system file limit reached",
/* FILE_IS_BUSY */ "target file is busy",
/* FILE_DOES_NOT_EXISTS */ "target file does not exists",
/* FILE_SYSTEM_ERROR */ "internal file system error",
/* FILE_IS_A_DIRECTORY */ "target file is a directory",
/* FILE_TOO_LARGE */ "file is too large",
/* DISK_IS_READ_ONLY */ "disk is in read only mode",
/* FILE_SEEK_FAILED */ "failed to seek to target file offset",
/* FILE_SIZE_MISMATCH */ "file size miss match",
/* FILE_IS_NOT_ACCESSIBLE */ "file is not accessible",
/* FAILED_TO_READ_MEDIA_BYTES */ "failed to read file media bytes",
/* COUNT_NOT_CREATE_DIRECTORIES */ "could not create required directories"
};
enum struct TransferKeyApplyResult {
SUCCESS,
FILE_ERROR,
UNKNOWN_KEY,
INTERNAL_ERROR
};
enum struct TransferUploadRawResult {
MORE_DATA_TO_RECEIVE,
FINISH,
FINISH_OVERFLOW,
/* UNKNOWN ERROR */
};
enum struct TransferUploadHTTPResult {
MORE_DATA_TO_RECEIVE,
FINISH,
BOUNDARY_MISSING,
BOUNDARY_TOKEN_INVALID,
BOUNDARY_INVALID,
MISSING_CONTENT_TYPE,
INVALID_CONTENT_TYPE
/* UNKNOWN ERROR */
};
struct NetworkBinding {
std::string hostname{};
sockaddr_storage address{};
};
struct ActiveNetworkBinding : std::enable_shared_from_this<ActiveNetworkBinding> {
std::string hostname{};
sockaddr_storage address{};
int file_descriptor{-1};
struct event* accept_event{nullptr};
LocalFileTransfer* handle{nullptr};
};
class LocalFileTransfer : public AbstractProvider {
public:
explicit LocalFileTransfer(filesystem::LocalFileSystem*);
~LocalFileTransfer();
[[nodiscard]] bool start();
void stop();
[[nodiscard]] NetworkingBindResult add_network_binding(const NetworkBinding& /* binding */);
[[nodiscard]] std::vector<NetworkBinding> active_network_bindings();
[[nodiscard]] NetworkingUnbindResult remove_network_binding(const NetworkBinding& /* binding */);
std::shared_ptr<ExecuteResponse<TransferInitError, std::shared_ptr<Transfer>>>
initialize_channel_transfer(Transfer::Direction direction, const std::shared_ptr<VirtualFileServer>& server, ChannelId channelId,
const TransferInfo &info) override;
std::shared_ptr<ExecuteResponse<TransferInitError, std::shared_ptr<Transfer>>>
initialize_icon_transfer(Transfer::Direction direction, const std::shared_ptr<VirtualFileServer>& server, const TransferInfo &info) override;
std::shared_ptr<ExecuteResponse<TransferInitError, std::shared_ptr<Transfer>>>
initialize_avatar_transfer(Transfer::Direction direction, const std::shared_ptr<VirtualFileServer>& server, const TransferInfo &info) override;
std::shared_ptr<ExecuteResponse<TransferListError, std::vector<ActiveFileTransfer>>> list_transfer() override;
std::shared_ptr<ExecuteResponse<TransferActionError>> stop_transfer(const std::shared_ptr<VirtualFileServer>& /* server */, transfer_id id, bool) override;
private:
enum struct DiskIOLoopState {
STOPPED,
RUNNING,
STOPPING,
FORCE_STOPPING
};
filesystem::LocalFileSystem* file_system_;
size_t max_concurrent_transfers{1024};
std::mt19937 transfer_random_token_generator{std::random_device{}()};
std::mutex result_notify_mutex{};
std::condition_variable result_notify_cv{};
std::mutex transfers_mutex{};
std::mutex transfer_create_mutex{};
std::deque<std::shared_ptr<FileClient>> transfers_{};
std::deque<std::shared_ptr<Transfer>> pending_transfers{};
enum ServerState {
STOPPED,
RUNNING
} state{ServerState::STOPPED};
struct {
bool active{false};
std::thread dispatch_thread{};
std::mutex mutex{};
std::condition_variable notify_cv{};
} disconnect;
struct {
std::mutex mutex;
bool active{false};
std::thread dispatch_thread{};
struct event_base* event_base{nullptr};
std::deque<std::shared_ptr<ActiveNetworkBinding>> bindings{};
} network{};
struct {
DiskIOLoopState state{DiskIOLoopState::STOPPED};
std::thread dispatch_thread{};
std::mutex queue_lock{};
std::condition_variable notify_work_awaiting{};
std::condition_variable notify_client_processed{};
FileClient* queue_head{nullptr};
FileClient** queue_tail{&queue_head};
} disk_io{};
template <typename error_t, typename result_t = EmptyExecuteResponse>
std::shared_ptr<ExecuteResponse<error_t, result_t>> create_execute_response() {
return std::make_shared<ExecuteResponse<error_t, result_t>>(this->result_notify_mutex, this->result_notify_cv);
}
std::shared_ptr<ExecuteResponse<TransferInitError, std::shared_ptr<Transfer>>>
initialize_transfer(Transfer::Direction, const std::shared_ptr<VirtualFileServer> &, ChannelId, Transfer::TargetType, const TransferInfo &info);
[[nodiscard]] NetworkingStartResult start_networking();
[[nodiscard]] DiskIOStartResult start_disk_io();
[[nodiscard]] ClientWorkerStartResult start_client_worker();
void shutdown_networking();
void shutdown_disk_io();
void shutdown_client_worker();
void disconnect_client(const std::shared_ptr<FileClient>& /* client */, std::unique_lock<std::shared_mutex>& /* state lock */, bool /* flush network */);
[[nodiscard]] NetworkInitializeResult initialize_networking(const std::shared_ptr<FileClient>& /* client */, int /* file descriptor */);
/* might block 'till all IO operations have been succeeded */
void finalize_networking(const std::shared_ptr<FileClient>& /* client */, std::unique_lock<std::shared_mutex>& /* state lock */);
[[nodiscard]] FileInitializeResult initialize_file_io(const std::shared_ptr<FileClient>& /* client */);
void finalize_file_io(const std::shared_ptr<FileClient>& /* client */, std::unique_lock<std::shared_mutex>& /* state lock */);
[[nodiscard]] bool initialize_client_ssl(const std::shared_ptr<FileClient>& /* client */);
void finalize_client_ssl(const std::shared_ptr<FileClient>& /* client */);
void enqueue_disk_io(const std::shared_ptr<FileClient>& /* client */);
void execute_disk_io(const std::shared_ptr<FileClient>& /* client */);
void test_disconnecting_state(const std::shared_ptr<FileClient>& /* client */);
[[nodiscard]] TransferUploadRawResult handle_transfer_upload_raw(const std::shared_ptr<FileClient>& /* client */, const char * /* buffer */, size_t /* length */, size_t* /* bytes written */);
[[nodiscard]] TransferUploadHTTPResult handle_transfer_upload_http(const std::shared_ptr<FileClient>& /* client */, const char * /* buffer */, size_t /* length */);
void send_http_response(const std::shared_ptr<FileClient>& /* client */, http::HttpResponse& /* response */);
static void callback_transfer_network_write(int, short, void*);
static void callback_transfer_network_read(int, short, void*);
static void callback_transfer_network_throttle(int, short, void*);
static void callback_transfer_network_accept(int, short, void*);
static void dispatch_loop_client_worker(void*);
static void dispatch_loop_network(void*);
static void dispatch_loop_disk_io(void*);
void report_transfer_statistics(const std::shared_ptr<FileClient>& /* client */);
[[nodiscard]] TransferStatistics generate_transfer_statistics_report(const std::shared_ptr<FileClient>& /* client */);
void invoke_aborted_callback(const std::shared_ptr<FileClient>& /* client */, const TransferError& /* error */);
void invoke_aborted_callback(const std::shared_ptr<Transfer>& /* pending transfer */, const TransferError& /* error */);
size_t handle_transfer_read(const std::shared_ptr<FileClient>& /* client */, const char* /* buffer */, size_t /* bytes */);
size_t handle_transfer_read_raw(const std::shared_ptr<FileClient>& /* client */, const char* /* buffer */, size_t /* bytes */);
[[nodiscard]] TransferKeyApplyResult handle_transfer_key_provided(const std::shared_ptr<FileClient>& /* client */, std::string& /* error */);
};
}

View File

@ -6,6 +6,7 @@
#include <event2/event.h>
#include <log/LogUtils.h>
#include "./LocalFileProvider.h"
#include "./LocalFileTransfer.h"
using namespace ts::server::file;
using namespace ts::server::file::transfer;
@ -113,7 +114,7 @@ void LocalFileTransfer::dispatch_loop_client_worker(void *ptr_transfer) {
continue;
}
provider->report_transfer_statistics(transfer->shared_from_this());
provider->report_transfer_statistics(transfer);
}
}
@ -193,9 +194,14 @@ void LocalFileTransfer::dispatch_loop_client_worker(void *ptr_transfer) {
{
std::unique_lock slock{client->state_mutex};
client->state = FileClient::STATE_DISCONNECTED;
provider->finalize_file_io(client, slock);
provider->finalize_client_ssl(client);
/*
* First of all disconnect the client from the network so no actions could be triggered by that way.
* Secondly finalize all network components, so no data is pending anywhere
* Thirdly drop the client's disk worker (if it's an upload the data should be written already, else we don't care anyways)
*/
provider->finalize_networking(client, slock);
provider->finalize_client_ssl(client);
provider->finalize_file_io(client, slock);
}
debugMessage(LOG_FT, "{} Destroying transfer.", client->log_prefix());

View File

@ -7,7 +7,7 @@
#include <experimental/filesystem>
#include <log/LogUtils.h>
#include "./LocalFileProvider.h"
#include "LocalFileProvider.h"
#include "./LocalFileTransfer.h"
#include "duration_utils.h"
using namespace ts::server::file;
@ -62,7 +62,13 @@ void LocalFileTransfer::dispatch_loop_disk_io(void *provider_ptr) {
provider->disk_io.notify_work_awaiting.wait(qlock, [&]{ return provider->disk_io.state != DiskIOLoopState::RUNNING || provider->disk_io.queue_head != nullptr; });
if(provider->disk_io.queue_head) {
client = provider->disk_io.queue_head->shared_from_this();
try {
client = provider->disk_io.queue_head->shared_from_this();
} catch (std::bad_weak_ptr& ex) {
logCritical(LOG_FT, "Disk worker encountered a bad weak ptr. This indicated something went horribly wrong! Please submit this on https://forum.teaspeak.de !!!");
client.reset();
continue;
}
provider->disk_io.queue_head = provider->disk_io.queue_head->file.next_client;
if(!provider->disk_io.queue_head)
@ -236,7 +242,7 @@ FileInitializeResult LocalFileTransfer::initialize_file_io(const std::shared_ptr
}
}
this->file_system_.lock_file(absolute_path);
this->file_system_->lock_file(absolute_path);
transfer->file.file_locked = true;
if(transfer->transfer->direction == Transfer::DIRECTION_UPLOAD) {
@ -307,7 +313,7 @@ FileInitializeResult LocalFileTransfer::initialize_file_io(const std::shared_ptr
return FileInitializeResult::SUCCESS;
error_exit:
if(std::exchange(transfer->file.file_locked, false))
this->file_system_.unlock_file(absolute_path);
this->file_system_->unlock_file(absolute_path);
if(file_data.file_descriptor > 0)
::close(file_data.file_descriptor);
@ -319,8 +325,9 @@ FileInitializeResult LocalFileTransfer::initialize_file_io(const std::shared_ptr
void LocalFileTransfer::finalize_file_io(const std::shared_ptr<FileClient> &transfer,
std::unique_lock<std::shared_mutex> &state_lock) {
assert(state_lock.owns_lock());
if(!transfer->transfer)
if(!transfer->transfer) {
return;
}
auto absolute_path = transfer->transfer->absolute_file_path;
@ -337,8 +344,9 @@ void LocalFileTransfer::finalize_file_io(const std::shared_ptr<FileClient> &tran
if(this->disk_io.queue_head == &*transfer) {
this->disk_io.queue_head = file_data.next_client;
if (!this->disk_io.queue_head)
if (!this->disk_io.queue_head) {
this->disk_io.queue_tail = &this->disk_io.queue_head;
}
} else if(file_data.next_client || this->disk_io.queue_tail == &file_data.next_client) {
FileClient* head{this->disk_io.queue_head};
while(head->file.next_client != &*transfer) {
@ -356,8 +364,9 @@ void LocalFileTransfer::finalize_file_io(const std::shared_ptr<FileClient> &tran
}
state_lock.lock();
if(std::exchange(file_data.file_locked, false))
this->file_system_.unlock_file(absolute_path);
if(std::exchange(file_data.file_locked, false)) {
this->file_system_->unlock_file(absolute_path);
}
if(file_data.file_descriptor > 0)
::close(file_data.file_descriptor);
@ -365,18 +374,22 @@ void LocalFileTransfer::finalize_file_io(const std::shared_ptr<FileClient> &tran
}
void LocalFileTransfer::enqueue_disk_io(const std::shared_ptr<FileClient> &client) {
if(!client->file.file_descriptor)
if(!client->file.file_descriptor) {
return;
}
if(!client->transfer)
if(!client->transfer) {
return;
}
if(client->transfer->direction == Transfer::DIRECTION_DOWNLOAD) {
if(client->state != FileClient::STATE_TRANSFERRING)
if(client->state != FileClient::STATE_TRANSFERRING) {
return;
}
if(client->disk_buffer.bytes > TRANSFER_MAX_CACHED_BYTES)
if(client->disk_buffer.bytes > TRANSFER_MAX_CACHED_BYTES) {
return;
}
} else if(client->transfer->direction == Transfer::DIRECTION_UPLOAD) {
/* we don't do this check because this might be a flush instruction, where the buffer is actually zero bytes filled */
/*
@ -386,8 +399,9 @@ void LocalFileTransfer::enqueue_disk_io(const std::shared_ptr<FileClient> &clien
}
std::lock_guard dlock{this->disk_io.queue_lock};
if(client->file.next_client || this->disk_io.queue_tail == &client->file.next_client)
if(client->file.next_client || this->disk_io.queue_tail == &client->file.next_client) {
return;
}
*this->disk_io.queue_tail = &*client;
this->disk_io.queue_tail = &client->file.next_client;
@ -399,8 +413,8 @@ void LocalFileTransfer::execute_disk_io(const std::shared_ptr<FileClient> &clien
if(!client->transfer) return;
if(client->transfer->direction == Transfer::DIRECTION_UPLOAD) {
Buffer* buffer{nullptr};
size_t buffer_left_size{0};
Buffer* buffer;
size_t buffer_left_size;
while(true) {
{

View File

@ -11,8 +11,8 @@
#include <include/files/Config.h>
#include "./LocalFileProvider.h"
#include "./duration_utils.h"
#include "HTTPUtils.h"
#include "LocalFileProvider.h"
#include "./HTTPUtils.h"
#include "./LocalFileTransfer.h"
#if defined(TCP_CORK) && !defined(TCP_NOPUSH)
#define TCP_NOPUSH TCP_CORK
@ -549,6 +549,13 @@ void LocalFileTransfer::callback_transfer_network_throttle(int, short, void *ptr
void LocalFileTransfer::callback_transfer_network_read(int fd, short events, void *ptr_transfer) {
auto transfer = reinterpret_cast<FileClient*>(ptr_transfer);
std::shared_ptr<FileClient> client{};
try {
client = transfer->shared_from_this();
} catch (std::bad_weak_ptr& ex) {
logCritical(LOG_FT, "Network read worker encountered a bad weak ptr to a client. This indicated something went horribly wrong! Please submit this on https://forum.teaspeak.de !!!");
return;
}
if((unsigned) events & (unsigned) EV_TIMEOUT) {
/* should never happen, receive timeouts are done via the client tick */
@ -569,7 +576,7 @@ void LocalFileTransfer::callback_transfer_network_read(int fd, short events, voi
if(read == 0) {
std::unique_lock slock{transfer->state_mutex};
auto original_state = transfer->state;
transfer->handle->disconnect_client(transfer->shared_from_this(), slock, true);
transfer->handle->disconnect_client(client, slock, true);
slock.unlock();
switch(original_state) {
@ -588,7 +595,7 @@ void LocalFileTransfer::callback_transfer_network_read(int fd, short events, voi
transfer->log_prefix(), transfer->statistics.file_transferred.total_bytes, transfer->transfer->expected_file_size - transfer->transfer->file_offset);
}
transfer->handle->invoke_aborted_callback(transfer->shared_from_this(), { TransferError::UNEXPECTED_CLIENT_DISCONNECT, "" });
transfer->handle->invoke_aborted_callback(client, { TransferError::UNEXPECTED_CLIENT_DISCONNECT, "" });
break;
}
case FileClient::STATE_FLUSHING:
@ -607,7 +614,7 @@ void LocalFileTransfer::callback_transfer_network_read(int fd, short events, voi
std::unique_lock slock{transfer->state_mutex};
auto original_state = transfer->state;
transfer->handle->disconnect_client(transfer->shared_from_this(), slock, true);
transfer->handle->disconnect_client(client, slock, true);
slock.unlock();
switch(original_state) {
@ -627,7 +634,7 @@ void LocalFileTransfer::callback_transfer_network_read(int fd, short events, voi
}
transfer->handle->invoke_aborted_callback(transfer->shared_from_this(), { TransferError::NETWORK_IO_ERROR, strerror(errno) });
transfer->handle->invoke_aborted_callback(client, { TransferError::NETWORK_IO_ERROR, strerror(errno) });
break;
case FileClient::STATE_FLUSHING:
case FileClient::STATE_DISCONNECTED:
@ -649,10 +656,10 @@ void LocalFileTransfer::callback_transfer_network_read(int fd, short events, voi
size_t bytes_buffered{0};
if(transfer->state == FileClient::STATE_AWAITING_KEY) {
bytes_buffered = transfer->handle->handle_transfer_read_raw(transfer->shared_from_this(), buffer, read);
bytes_buffered = transfer->handle->handle_transfer_read_raw(client, buffer, read);
} else if(transfer->state == FileClient::STATE_TRANSFERRING) {
if(transfer->transfer->direction == Transfer::DIRECTION_UPLOAD) {
bytes_buffered = transfer->handle->handle_transfer_read_raw(transfer->shared_from_this(), buffer, read);
bytes_buffered = transfer->handle->handle_transfer_read_raw(client, buffer, read);
} else {
debugMessage(LOG_FT, "{} Received {} bytes without any need. Dropping them.", transfer->log_prefix(), read);
}
@ -678,6 +685,13 @@ void LocalFileTransfer::callback_transfer_network_read(int fd, short events, voi
void LocalFileTransfer::callback_transfer_network_write(int fd, short events, void *ptr_transfer) {
auto transfer = reinterpret_cast<FileClient*>(ptr_transfer);
std::shared_ptr<FileClient> client{};
try {
client = transfer->shared_from_this();
} catch (std::bad_weak_ptr& ex) {
logCritical(LOG_FT, "Network write worker encountered a bad weak ptr to a client. This indicated something went horribly wrong! Please submit this on https://forum.teaspeak.de !!!");
return;
}
if((unsigned) events & (unsigned) EV_TIMEOUT) {
if(transfer->state == FileClient::STATE_FLUSHING) {
@ -693,11 +707,11 @@ void LocalFileTransfer::callback_transfer_network_write(int fd, short events, vo
if(!std::exchange(transfer->finished_signal_send, true)) {
if(transfer->transfer) {
transfer->handle->invoke_aborted_callback(transfer->shared_from_this(), { TransferError::NETWORK_IO_ERROR, "failed to flush outgoing buffer" });
transfer->handle->invoke_aborted_callback(client, { TransferError::NETWORK_IO_ERROR, "failed to flush outgoing buffer" });
}
}
transfer->handle->test_disconnecting_state(transfer->shared_from_this());
transfer->handle->test_disconnecting_state(client);
return;
}
}
@ -736,12 +750,12 @@ void LocalFileTransfer::callback_transfer_network_write(int fd, short events, vo
logError(LOG_FT, "{} Client disconnected unexpectedly on write. Send {} bytes out of {}.",
transfer->log_prefix(), transfer->statistics.file_transferred.total_bytes, transfer->transfer ? transfer->transfer->expected_file_size - transfer->transfer->file_offset : -1);
transfer->handle->invoke_aborted_callback(transfer->shared_from_this(), { TransferError::UNEXPECTED_CLIENT_DISCONNECT, "" });
transfer->handle->invoke_aborted_callback(client, { TransferError::UNEXPECTED_CLIENT_DISCONNECT, "" });
} else {
logError(LOG_FT, "{} Received network write error. Send {} bytes out of {}. Closing transfer.",
transfer->log_prefix(), transfer->statistics.file_transferred.total_bytes, transfer->transfer ? transfer->transfer->expected_file_size - transfer->transfer->file_offset : -1);
transfer->handle->invoke_aborted_callback(transfer->shared_from_this(), { TransferError::NETWORK_IO_ERROR, strerror(errno) });
transfer->handle->invoke_aborted_callback(client, { TransferError::NETWORK_IO_ERROR, strerror(errno) });
}
} else if(transfer->state == FileClient::STATE_FLUSHING && transfer->transfer) {
{
@ -753,12 +767,12 @@ void LocalFileTransfer::callback_transfer_network_write(int fd, short events, vo
transfer->flush_network_buffer();
if(written == 0) {
logError(LOG_FT, "{} Received unexpected client disconnect while flushing the network buffer. Transfer failed.", transfer->log_prefix());
transfer->handle->invoke_aborted_callback(transfer->shared_from_this(), { TransferError::UNEXPECTED_CLIENT_DISCONNECT, "" });
transfer->handle->invoke_aborted_callback(client, { TransferError::UNEXPECTED_CLIENT_DISCONNECT, "" });
} else {
logError(LOG_FT, "{} Received network write error while flushing the network buffer. Closing transfer.",
transfer->log_prefix());
transfer->handle->invoke_aborted_callback(transfer->shared_from_this(), { TransferError::NETWORK_IO_ERROR, strerror(errno) });
transfer->handle->invoke_aborted_callback(client, { TransferError::NETWORK_IO_ERROR, strerror(errno) });
}
}
@ -773,7 +787,7 @@ void LocalFileTransfer::callback_transfer_network_write(int fd, short events, vo
std::unique_lock slock{transfer->state_mutex};
/* no need to flush anything here, write will only be invoked on a client download */
transfer->handle->disconnect_client(transfer->shared_from_this(), slock, false);
transfer->handle->disconnect_client(client, slock, false);
return;
} else {
buffer->offset += written;
@ -808,19 +822,19 @@ void LocalFileTransfer::callback_transfer_network_write(int fd, short events, vo
if(buffer_left_size > 0) {
transfer->add_network_write_event(false);
} else if(transfer->state == FileClient::STATE_FLUSHING) {
transfer->handle->test_disconnecting_state(transfer->shared_from_this());
transfer->handle->test_disconnecting_state(client);
if(!std::exchange(transfer->finished_signal_send, true)) {
if(transfer->transfer && transfer->statistics.file_transferred.total_bytes + transfer->transfer->file_offset == transfer->transfer->expected_file_size) {
logMessage(LOG_FT, "{} Finished file transfer within {}. Closing connection.", transfer->log_prefix(), duration_to_string(std::chrono::system_clock::now() - transfer->timings.key_received));
transfer->handle->report_transfer_statistics(transfer->shared_from_this());
transfer->handle->report_transfer_statistics(client);
if(auto callback{transfer->handle->callback_transfer_finished}; callback)
callback(transfer->transfer);
}
}
return;
}
transfer->handle->enqueue_disk_io(transfer->shared_from_this());
transfer->handle->enqueue_disk_io(client);
}
}
@ -845,7 +859,7 @@ size_t LocalFileTransfer::handle_transfer_read_raw(const std::shared_ptr<FileCli
logWarning(LOG_FT, "{} Read bytes with unknown protocol. Closing connection.", client->log_prefix());
std::unique_lock slock{client->state_mutex};
client->handle->disconnect_client(client->shared_from_this(), slock, true);
client->handle->disconnect_client(client, slock, true);
return (size_t) -1;
}
@ -853,7 +867,7 @@ size_t LocalFileTransfer::handle_transfer_read_raw(const std::shared_ptr<FileCli
logWarning(LOG_FT, "{} Read bytes with unknown protocol but having not awaiting key state. Closing connection.", client->log_prefix());
std::unique_lock slock{client->state_mutex};
client->handle->disconnect_client(client->shared_from_this(), slock, true);
client->handle->disconnect_client(client, slock, true);
return (size_t) -1;
}
@ -923,7 +937,7 @@ size_t LocalFileTransfer::handle_transfer_read_raw(const std::shared_ptr<FileCli
std::unique_lock slock{client->state_mutex};
client->handle->disconnect_client(client->shared_from_this(), slock, true);
client->handle->disconnect_client(client, slock, true);
return (size_t) -1;
}
@ -1066,7 +1080,7 @@ size_t LocalFileTransfer::handle_transfer_read(const std::shared_ptr<FileClient>
this->send_http_response(client, response);
if(response.code->code != 200 || !client->transfer) {
std::unique_lock slock{client->state_mutex};
client->handle->disconnect_client(client->shared_from_this(), slock, true);
client->handle->disconnect_client(client, slock, true);
return (size_t) -1;
}
@ -1080,7 +1094,7 @@ size_t LocalFileTransfer::handle_transfer_read(const std::shared_ptr<FileClient>
logError(LOG_FT, "{} Protocol variable contains invalid protocol for awaiting key state. Disconnecting client.", client->log_prefix());
std::unique_lock slock{client->state_mutex};
client->handle->disconnect_client(client->shared_from_this(), slock, true);
client->handle->disconnect_client(client, slock, true);
return (size_t) -1;
}
} else if(client->state == FileClient::STATE_TRANSFERRING) {
@ -1104,7 +1118,7 @@ size_t LocalFileTransfer::handle_transfer_read(const std::shared_ptr<FileClient>
callback(client->transfer);
std::unique_lock slock{client->state_mutex};
client->handle->disconnect_client(client->shared_from_this(), slock, true);
client->handle->disconnect_client(client, slock, true);
return client->network_buffer.bytes; /* a bit unexact but the best we could get away with it */
}
@ -1144,7 +1158,7 @@ size_t LocalFileTransfer::handle_transfer_read(const std::shared_ptr<FileClient>
client->handle->send_http_response(client, response);
std::unique_lock slock{client->state_mutex};
client->handle->disconnect_client(client->shared_from_this(), slock, true);
client->handle->disconnect_client(client, slock, true);
return (size_t) -1;
} else if(client->networking.protocol == FileClient::PROTOCOL_TS_V1) {
@ -1165,7 +1179,7 @@ size_t LocalFileTransfer::handle_transfer_read(const std::shared_ptr<FileClient>
callback(client->transfer);
std::unique_lock slock{client->state_mutex};
client->handle->disconnect_client(client->shared_from_this(), slock, true);
client->handle->disconnect_client(client, slock, true);
return (size_t) -1;
}

@ -1 +1 @@
Subproject commit 61e4c6bc67e72843bfeef09108db58fd79babf1e
Subproject commit b36b1e19aeb8bd5026437d2bb0b4d91c480ed65b

View File

@ -314,8 +314,9 @@ command_result ConnectedClient::handleCommandServerGroupAdd(Command &cmd) {
ACTION_REQUIRES_GLOBAL_PERMISSION(permission::b_serverinstance_modify_templates, 1);
log_group_type = log::GroupType::TEMPLATE;
} else {
if(!this->server)
if(!this->server) {
return command_result{error::parameter_invalid, "you cant create normal groups on the template server!"};
}
log_group_type = log::GroupType::NORMAL;
}