File transfer server now respects the port and host settings set via the config.yml

This commit is contained in:
WolverinDEV 2020-06-13 01:08:49 +02:00
parent ed7cbd38e8
commit a23002ce66
11 changed files with 263 additions and 122 deletions

View File

@ -413,7 +413,7 @@ namespace ts::server::file {
std::deque<std::shared_ptr<VirtualFileServer>> servers_{};
};
extern bool initialize(std::string& /* error */);
extern bool initialize(std::string& /* error */, const std::string& /* host names */, uint16_t /* port */);
extern void finalize();
extern std::shared_ptr<AbstractFileServer> server();

View File

@ -3,6 +3,7 @@
//
#include <netinet/in.h>
#include <log/LogUtils.h>
#include "LocalFileProvider.h"
using namespace ts::server;
@ -10,13 +11,80 @@ using LocalFileServer = file::LocalFileProvider;
using LocalVirtualFileServer = file::LocalVirtualFileServer;
std::shared_ptr<LocalFileServer> server_instance{};
bool file::initialize(std::string &error) {
bool file::initialize(std::string &error, const std::string& hostnames, uint16_t port) {
server_instance = std::make_shared<LocalFileProvider>();
if(!server_instance->initialize(error)) {
server_instance = nullptr;
return false;
}
bool any_bind{false};
for(const auto& binding : net::resolve_bindings(hostnames, port)) {
if(!get<2>(binding).empty()) {
logError(LOG_FT, "Failed to resolve binding for {}: {}", get<0>(binding), get<2>(binding));
continue;
}
auto result = dynamic_cast<transfer::LocalFileTransfer&>(server_instance->file_transfer()).add_network_binding({ get<0>(binding), get<1>(binding) });
switch (result) {
case transfer::NetworkingBindResult::SUCCESS:
any_bind = true;
break;
case transfer::NetworkingBindResult::OUT_OF_MEMORY:
logWarning(LOG_FT, "Failed to listen to address {}: Out of memory", get<0>(binding));
continue;
case transfer::NetworkingBindResult::FAILED_TO_LISTEN:
logWarning(LOG_FT, "Failed to listen on {}: {}/{}", get<0>(binding), errno, strerror(errno));
continue;
case transfer::NetworkingBindResult::FAILED_TO_BIND:
logWarning(LOG_FT, "Failed to bind on {}: {}/{}", get<0>(binding), errno, strerror(errno));
continue;
case transfer::NetworkingBindResult::BINDING_ALREADY_EXISTS:
logWarning(LOG_FT, "Failed to bind on {}: binding already exists", get<0>(binding));
continue;
case transfer::NetworkingBindResult::NETWORKING_NOT_INITIALIZED:
logWarning(LOG_FT, "Failed to bind on {}: networking not initialized", get<0>(binding));
continue;
case transfer::NetworkingBindResult::FAILED_TO_ALLOCATE_SOCKET:
logWarning(LOG_FT, "Failed to allocate a socket for {}: {}/{}", get<0>(binding), errno, strerror(errno));
continue;
}
}
#if 0
{
auto query_bindings_string = this->properties()[property::SERVERINSTANCE_QUERY_HOST].as<string>();
auto query_port = this->properties()[property::SERVERINSTANCE_QUERY_PORT].as<uint16_t>();
auto query_bindings = net::resolve_bindings(query_bindings_string, query_port);
deque<shared_ptr<QueryServer::Binding>> bindings;
for(auto& binding : query_bindings) {
if(!get<2>(binding).empty()) {
logError(LOG_QUERY, "Failed to resolve binding for {}: {}", get<0>(binding), get<2>(binding));
continue;
}
auto entry = make_shared<QueryServer::Binding>();
memcpy(&entry->address, &get<1>(binding), sizeof(sockaddr_storage));
entry->file_descriptor = -1;
entry->event_accept = nullptr;
bindings.push_back(entry);
}
logMessage(LOG_QUERY, "Starting server on {}:{}", query_bindings_string, query_port);
if(!queryServer->start(bindings, errorMessage)) {
logCritical(LOG_QUERY, "Failed to start query server: {}", errorMessage);
return false;
}
}
#endif
return true;
}
@ -32,28 +100,13 @@ std::shared_ptr<file::AbstractFileServer> file::server() {
}
LocalFileServer::LocalFileProvider() : file_system_{}, file_transfer_{this->file_system_} {}
LocalFileServer::~LocalFileProvider() {}
LocalFileServer::~LocalFileProvider() = default;
bool LocalFileServer::initialize(std::string &error) {
if(!this->file_system_.initialize(error, "files/"))
return false;
std::deque<std::shared_ptr<transfer::NetworkBinding>> bindings{};
{
auto binding = std::make_shared<transfer::NetworkBinding>();
binding->hostname = "0.0.0.0";
auto& iaddr = *(sockaddr_in*) &binding->address;
iaddr.sin_family = AF_INET;
iaddr.sin_port = htons(30303);
iaddr.sin_addr.s_addr = INADDR_ANY;
bindings.push_back(std::move(binding));
}
if(!this->file_transfer_.start(bindings)) {
if(!this->file_transfer_.start()) {
error = "transfer server startup failed";
this->file_system_.finalize();
return false;

View File

@ -11,6 +11,7 @@
#include <misc/net.h>
#include <misc/spin_mutex.h>
#include <random>
#include <misc/memtracker.h>
#include "./NetTools.h"
#define TRANSFER_MAX_CACHED_BYTES (1024 * 1024 * 1) // Buffer up to 1mb
@ -243,7 +244,7 @@ namespace ts::server::file {
std::chrono::system_clock::time_point disconnecting{};
} timings;
explicit FileClient(LocalFileTransfer* handle) : handle{handle} {}
explicit FileClient(LocalFileTransfer* handle) : handle{handle} { memtrack::allocated<FileClient>(this); }
~FileClient();
void add_network_write_event(bool /* ignore bandwidth limits */);
@ -270,8 +271,24 @@ namespace ts::server::file {
enum struct NetworkingStartResult {
SUCCESS,
OUT_OF_MEMORY
};
enum struct NetworkingBindResult {
SUCCESS,
BINDING_ALREADY_EXISTS,
NETWORKING_NOT_INITIALIZED,
FAILED_TO_ALLOCATE_SOCKET, /* errno is set */
FAILED_TO_BIND,
FAILED_TO_LISTEN,
OUT_OF_MEMORY,
NO_BINDINGS
};
enum struct NetworkingUnbindResult {
SUCCESS,
UNKNOWN_BINDING
};
enum struct ClientWorkerStartResult {
@ -364,7 +381,12 @@ namespace ts::server::file {
/* UNKNOWN ERROR */
};
struct NetworkBinding : std::enable_shared_from_this<NetworkBinding> {
struct NetworkBinding {
std::string hostname{};
sockaddr_storage address{};
};
struct ActiveNetworkBinding : std::enable_shared_from_this<ActiveNetworkBinding> {
std::string hostname{};
sockaddr_storage address{};
@ -379,9 +401,13 @@ namespace ts::server::file {
explicit LocalFileTransfer(filesystem::LocalFileSystem&);
~LocalFileTransfer();
[[nodiscard]] bool start(const std::deque<std::shared_ptr<NetworkBinding>>& /* bindings */);
[[nodiscard]] bool start();
void stop();
[[nodiscard]] NetworkingBindResult add_network_binding(const NetworkBinding& /* binding */);
[[nodiscard]] std::vector<NetworkBinding> active_network_bindings();
[[nodiscard]] NetworkingUnbindResult remove_network_binding(const NetworkBinding& /* binding */);
std::shared_ptr<ExecuteResponse<TransferInitError, std::shared_ptr<Transfer>>>
initialize_channel_transfer(Transfer::Direction direction, const std::shared_ptr<VirtualFileServer>& server, ChannelId channelId,
const TransferInfo &info) override;
@ -430,11 +456,13 @@ namespace ts::server::file {
} disconnect;
struct {
std::mutex mutex;
bool active{false};
std::thread dispatch_thread{};
struct event_base* event_base{nullptr};
std::deque<std::shared_ptr<NetworkBinding>> bindings{};
std::deque<std::shared_ptr<ActiveNetworkBinding>> bindings{};
} network{};
struct {

View File

@ -7,7 +7,6 @@
#include <log/LogUtils.h>
#include <random>
#include "./LocalFileProvider.h"
#include "LocalFileProvider.h"
#include <experimental/filesystem>
namespace fs = std::experimental::filesystem;
@ -42,12 +41,13 @@ FileClient::~FileClient() {
assert(!this->networking.event_write);
assert(this->state == STATE_DISCONNECTED);
memtrack::freed<FileClient>(this);
}
LocalFileTransfer::LocalFileTransfer(filesystem::LocalFileSystem &fs) : file_system_{fs} {}
LocalFileTransfer::~LocalFileTransfer() = default;
bool LocalFileTransfer::start(const std::deque<std::shared_ptr<NetworkBinding>>& bindings) {
bool LocalFileTransfer::start() {
(void) this->start_client_worker();
{
@ -66,17 +66,15 @@ bool LocalFileTransfer::start(const std::deque<std::shared_ptr<NetworkBinding>>&
{
this->network.bindings = bindings;
auto start_result = this->start_networking();
switch (start_result) {
case NetworkingStartResult::SUCCESS:
break;
case NetworkingStartResult::OUT_OF_MEMORY:
logError(LOG_FT, "Failed to start networking (Out of memory)");
goto error_exit_network;
case NetworkingStartResult::NO_BINDINGS:
logError(LOG_FT, "Failed to start networking (No address could be bound)");
goto error_exit_network;
default:
logError(LOG_FT, "Failed to start networking ({})", (int) start_result);
goto error_exit_network;

View File

@ -168,80 +168,142 @@ void FileClient::flush_network_buffer() {
}
NetworkingStartResult LocalFileTransfer::start_networking() {
std::lock_guard nlock{this->network.mutex};
assert(!this->network.active);
this->network.active = true;
this->network.event_base = event_base_new();
if(!this->network.event_base) return NetworkingStartResult::OUT_OF_MEMORY;
bool bound{false};
for(auto& binding : this->network.bindings) {
binding->file_descriptor = socket(binding->address.ss_family, SOCK_STREAM | SOCK_NONBLOCK, 0);
if(!binding->file_descriptor) {
logWarning(LOG_FT, "Failed to allocate socket for {}: {}/{}", binding->hostname, errno, strerror(errno));
continue;
this->network.dispatch_thread = std::thread(&LocalFileTransfer::dispatch_loop_network, this);
return NetworkingStartResult::SUCCESS;
}
NetworkingBindResult LocalFileTransfer::add_network_binding(const NetworkBinding &binding) {
std::lock_guard nlock{this->network.mutex};
if(!this->network.active)
return NetworkingBindResult::NETWORKING_NOT_INITIALIZED;
for(const auto& abinding : this->network.bindings) {
if(net::address_equal(abinding->address, binding.address) && net::port(abinding->address) == net::port(binding.address))
return NetworkingBindResult::BINDING_ALREADY_EXISTS;
}
NetworkingBindResult result{NetworkingBindResult::SUCCESS};
auto abinding = std::make_shared<ActiveNetworkBinding>();
abinding->handle = this;
abinding->hostname = binding.hostname;
memcpy(&abinding->address, &binding.address, sizeof(binding.address));
abinding->file_descriptor = socket(abinding->address.ss_family, SOCK_STREAM | SOCK_NONBLOCK, 0);
if(!abinding->file_descriptor) {
//logWarning(LOG_FT, "Failed to allocate socket for {}: {}/{}", abinding->hostname, errno, strerror(errno));
return NetworkingBindResult::FAILED_TO_ALLOCATE_SOCKET;
}
int enable = 1, disabled = 0;
if (setsockopt(binding->file_descriptor, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(int)) < 0)
logWarning(LOG_FT, "Failed to activate SO_REUSEADDR for binding {} ({} | {})", binding->hostname, errno, strerror(errno));
if (setsockopt(abinding->file_descriptor, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(int)) < 0)
logWarning(LOG_FT, "Failed to activate SO_REUSEADDR for binding {} ({}/{})", abinding->hostname, errno, strerror(errno));
if(setsockopt(binding->file_descriptor, IPPROTO_TCP, TCP_NOPUSH, &disabled, sizeof disabled) < 0)
logWarning(LOG_FT, "Failed to deactivate TCP_NOPUSH for binding {} ({} | {})", binding->hostname, errno, strerror(errno));
if(setsockopt(abinding->file_descriptor, IPPROTO_TCP, TCP_NOPUSH, &disabled, sizeof disabled) < 0)
logWarning(LOG_FT, "Failed to deactivate TCP_NOPUSH for binding {} ({}/{})", abinding->hostname, errno, strerror(errno));
if(binding->address.ss_family == AF_INET6) {
if(setsockopt(binding->file_descriptor, IPPROTO_IPV6, IPV6_V6ONLY, &enable, sizeof(int)) < 0)
logWarning(LOG_FT, "Failed to activate IPV6_V6ONLY for IPv6 binding {} ({} | {})", binding->hostname, errno, strerror(errno));
if(abinding->address.ss_family == AF_INET6) {
if(setsockopt(abinding->file_descriptor, IPPROTO_IPV6, IPV6_V6ONLY, &enable, sizeof(int)) < 0)
logWarning(LOG_FT, "Failed to activate IPV6_V6ONLY for IPv6 binding {} ({}/{})", abinding->hostname, errno, strerror(errno));
}
if(fcntl(binding->file_descriptor, F_SETFD, FD_CLOEXEC) < 0)
logWarning(LOG_FT, "Failed to set flag FD_CLOEXEC for binding {} ({} | {})", binding->hostname, errno, strerror(errno));
if(fcntl(abinding->file_descriptor, F_SETFD, FD_CLOEXEC) < 0)
logWarning(LOG_FT, "Failed to set flag FD_CLOEXEC for binding {} ({}/{})", abinding->hostname, errno, strerror(errno));
if (bind(binding->file_descriptor, (struct sockaddr *) &binding->address, sizeof(binding->address)) < 0) {
logError(LOG_FT, "Failed to bind server to {}. (Failed to bind socket: {} | {})", binding->hostname, errno, strerror(errno));
if (bind(abinding->file_descriptor, (struct sockaddr *) &abinding->address, sizeof(abinding->address)) < 0) {
//logError(LOG_FT, "Failed to bind server to {}. (Failed to bind socket: {}/{})", binding->hostname, errno, strerror(errno));
result = NetworkingBindResult::FAILED_TO_BIND;
goto reset_binding;
}
if (listen(binding->file_descriptor, 8) < 0) {
logError(LOG_FT, "Failed to bind server to {}. (Failed to listen: {} | {})", binding->hostname, errno, strerror(errno));
if (listen(abinding->file_descriptor, 8) < 0) {
//logError(LOG_FT, "Failed to bind server to {}. (Failed to listen: {}/{})", binding->hostname, errno, strerror(errno));
result = NetworkingBindResult::FAILED_TO_LISTEN;
goto reset_binding;
}
binding->handle = this;
binding->accept_event = event_new(this->network.event_base, binding->file_descriptor, (unsigned) EV_READ | (unsigned) EV_PERSIST, &LocalFileTransfer::callback_transfer_network_accept, &*binding);
if(!binding->accept_event)
abinding->accept_event = event_new(this->network.event_base, abinding->file_descriptor, (unsigned) EV_READ | (unsigned) EV_PERSIST, &LocalFileTransfer::callback_transfer_network_accept, &*abinding);
if(!abinding->accept_event) {
result = NetworkingBindResult::OUT_OF_MEMORY;
goto reset_binding;
}
event_add(binding->accept_event, nullptr);
logMessage(LOG_FT, "Started to listen on {}:{}", binding->hostname, net::port(binding->address));
event_add(abinding->accept_event, nullptr);
logMessage(LOG_FT, "Started to listen on {}:{}", abinding->hostname, net::port(abinding->address));
this->network.bindings.push_back(std::move(abinding));
bound = true;
continue;
return NetworkingBindResult::SUCCESS;
reset_binding:
if(binding->accept_event) {
event_free(binding->accept_event);
binding->accept_event = nullptr;
if(abinding->accept_event) {
event_free(abinding->accept_event);
abinding->accept_event = nullptr;
}
if(binding->file_descriptor > 0)
::close(binding->file_descriptor);
binding->file_descriptor = 0;
if(abinding->file_descriptor > 0)
::close(abinding->file_descriptor);
abinding->file_descriptor = 0;
abinding->handle = nullptr;
return result;
}
binding->handle = nullptr;
}
if(!bound) {
event_base_free(std::exchange(this->network.event_base, nullptr));
return NetworkingStartResult::NO_BINDINGS;
std::vector<NetworkBinding> LocalFileTransfer::active_network_bindings() {
std::lock_guard nlock{this->network.mutex};
std::vector<NetworkBinding> result{};
result.reserve(this->network.bindings.size());
for(const auto& binding : this->network.bindings) {
auto& rbinding = result.emplace_back();
rbinding.hostname = binding->hostname;
memcpy(&rbinding.address, &binding->address, sizeof(rbinding.address));
}
this->network.dispatch_thread = std::thread(&LocalFileTransfer::dispatch_loop_network, this);
return NetworkingStartResult::SUCCESS;
return result;
}
NetworkingUnbindResult LocalFileTransfer::remove_network_binding(const NetworkBinding &binding) {
std::lock_guard nlock{this->network.mutex};
std::shared_ptr<ActiveNetworkBinding> abinding{};
for(auto it = this->network.bindings.begin(); it != this->network.bindings.end(); it++) {
abinding = *it;
if(net::address_equal(abinding->address, binding.address) && net::port(abinding->address) == net::port(binding.address)) {
this->network.bindings.erase(it);
break;
}
abinding = nullptr;
}
if(!abinding)
return NetworkingUnbindResult::UNKNOWN_BINDING;
if(abinding->accept_event) {
event_del_block(abinding->accept_event);
event_free(abinding->accept_event);
abinding->accept_event = nullptr;
}
if(abinding->file_descriptor > 0)
::close(abinding->file_descriptor);
abinding->file_descriptor = 0;
abinding->handle = nullptr;
return NetworkingUnbindResult::SUCCESS;
}
void LocalFileTransfer::shutdown_networking() {
event_base* ev_base;
std::thread dispatch_thread{};
{
std::lock_guard nlock{this->network.mutex};
if(!this->network.active) return;
this->network.active = false;
@ -255,9 +317,13 @@ void LocalFileTransfer::shutdown_networking() {
if(binding->file_descriptor > 0)
::close(binding->file_descriptor);
binding->file_descriptor = 0;
binding->handle = nullptr;
}
this->network.bindings.clear();
ev_base = std::exchange(this->network.event_base, nullptr);
std::swap(this->network.dispatch_thread, dispatch_thread);
}
{
std::unique_lock tlock{this->transfers_mutex};
@ -269,12 +335,11 @@ void LocalFileTransfer::shutdown_networking() {
}
}
auto ev_base = std::exchange(this->network.event_base, nullptr);
if(ev_base)
event_base_loopbreak(ev_base);
if(this->network.dispatch_thread.joinable())
this->network.dispatch_thread.join();
if(dispatch_thread.joinable())
dispatch_thread.join();
if(ev_base)
event_base_free(ev_base);
@ -428,7 +493,7 @@ void LocalFileTransfer::finalize_client_ssl(const std::shared_ptr<FileClient> &c
}
void LocalFileTransfer::callback_transfer_network_accept(int fd, short, void *ptr_binding) {
auto binding = reinterpret_cast<NetworkBinding*>(ptr_binding);
auto binding = reinterpret_cast<ActiveNetworkBinding*>(ptr_binding);
auto transfer = binding->handle;
sockaddr_storage address{};

View File

@ -3,7 +3,6 @@
//
#include <files/FileServer.h>
#include <log/LogUtils.h>
#include <files/Config.h>
#include "./client/ConnectedClient.h"
@ -15,15 +14,10 @@ using namespace ts::server::file;
FileServerHandler::FileServerHandler(ts::server::InstanceHandler *instance) : instance_{instance} {}
bool FileServerHandler::initialize(std::string &error) {
/*
* FIXME: Ports etc!
*
auto bindings_string = this->properties()[property::SERVERINSTANCE_FILETRANSFER_HOST].as<string>();
auto port = this->properties()[property::SERVERINSTANCE_FILETRANSFER_PORT].as<uint16_t>();
*/
if(!file::initialize(error))
if(!file::initialize(error, serverInstance->properties()[property::SERVERINSTANCE_FILETRANSFER_HOST].as<std::string>(), serverInstance->properties()[property::SERVERINSTANCE_FILETRANSFER_PORT].as<uint16_t>()))
return false;
file::config::ssl_option_supplier = [&]{
return this->instance_->sslManager()->web_ssl_options();
};

View File

@ -58,7 +58,6 @@ namespace ts {
class InstanceHandler;
class VoiceServer;
class QueryServer;
class LocalFileServer;
class SpeakingClient;
class WebControlServer;

View File

@ -259,8 +259,8 @@ command_result ConnectedClient::handleCommandServerRequestConnectionInfo(Command
first_bulk.put_unchecked(property::CONNECTION_FILETRANSFER_BANDWIDTH_SENT, minute_report.file_bytes_sent);
first_bulk.put_unchecked(property::CONNECTION_FILETRANSFER_BANDWIDTH_RECEIVED, minute_report.file_bytes_received);
first_bulk.put_unchecked(property::CONNECTION_FILETRANSFER_BYTES_SENT_TOTAL, minute_report.file_bytes_sent);
first_bulk.put_unchecked(property::CONNECTION_FILETRANSFER_BYTES_RECEIVED_TOTAL, minute_report.file_bytes_received);
first_bulk.put_unchecked(property::CONNECTION_FILETRANSFER_BYTES_SENT_TOTAL, this->server->properties()[property::VIRTUALSERVER_TOTAL_BYTES_DOWNLOADED].as<string>());
first_bulk.put_unchecked(property::CONNECTION_FILETRANSFER_BYTES_RECEIVED_TOTAL, this->server->properties()[property::VIRTUALSERVER_TOTAL_BYTES_UPLOADED].as<string>());
first_bulk.put_unchecked("connection_filetransfer_bytes_sent_month", this->server->properties()[property::VIRTUALSERVER_MONTH_BYTES_DOWNLOADED].as<string>());
first_bulk.put_unchecked("connection_filetransfer_bytes_received_month", this->server->properties()[property::VIRTUALSERVER_MONTH_BYTES_UPLOADED].as<string>());

View File

@ -190,6 +190,8 @@ void VoiceBridge::handle_data_channel(const std::shared_ptr<rtc::DataChannel> &c
weak_ptr<rtc::DataChannel> weak_channel = channel;
channel->callback_binary = [&, weak_channel](const pipes::buffer_view& buffer) {
if(buffer.length() < 2)
return;
this->callback_voice_data(buffer.view(2), buffer[0] == 1, buffer[1] == 1); /* buffer.substr(2), buffer[0] == 1, buffer[1] == 1 */
};

View File

@ -28,6 +28,8 @@ WebClient::WebClient(WebControlServer* server, int fd) : SpeakingClient(server->
assert(server->getTS());
this->state = ConnectionState::INIT_LOW;
this->file_descriptor = fd;
debugMessage(this->server->getServerId(), " Creating WebClient instance at {}", (void*) this);
}
void WebClient::initialize() {
@ -120,6 +122,7 @@ void WebClient::initialize() {
WebClient::~WebClient() {
memtrack::freed<WebClient>(this);
debugMessage(this->server->getServerId(), " Destroying WebClient instance at {}", (void*) this);
}
static Json::StreamWriterBuilder stream_write_builder = []{

View File

@ -11,8 +11,6 @@
#include "../SignalHandler.h"
#include "../client/ConnectedClient.h"
#include "../InstanceHandler.h"
#include "../VirtualServerManager.h"
#include "../VirtualServer.h"
#include "../ShutdownHelper.h"
#include "../server/QueryServer.h"
@ -385,6 +383,7 @@ namespace terminal {
resident_set = rss * page_size_kb;
}
//meminfo track
bool handleCommandMemInfo(TerminalCommand& cmd){
bool flag_base = false, flag_malloc = false, flag_track = false, flag_buffer = false;