Fle server & Query server improvements

This commit is contained in:
WolverinDEV 2019-07-21 10:43:26 +02:00
parent e507d1f75d
commit d4d6978d5f
18 changed files with 1138 additions and 313 deletions

View File

@ -963,8 +963,8 @@ std::deque<std::shared_ptr<EntryBinding>> config::create_bindings() {
}
{
CREATE_BINDING("host", 0);
BIND_STRING(config::binding::DefaultQueryHost, "0.0.0.0");
ADD_NOTE("Multibinding like the voice server isnt supported yet!");
BIND_STRING(config::binding::DefaultQueryHost, "0.0.0.0,[::]");
ADD_NOTE("Multibinding supported here! Host delimiter is \",\"");
}
}
{
@ -975,8 +975,8 @@ std::deque<std::shared_ptr<EntryBinding>> config::create_bindings() {
}
{
CREATE_BINDING("host", 0);
BIND_STRING(config::binding::DefaultFileHost, "0.0.0.0");
ADD_NOTE("Multibinding like the voice server isnt supported yet!");
BIND_STRING(config::binding::DefaultFileHost, "0.0.0.0,[::]");
ADD_NOTE("Multibinding supported here! Host delimiter is \",\"");
}
}
}

View File

@ -212,6 +212,17 @@ InstanceHandler::~InstanceHandler() {
tick_manager = nullptr;
}
inline string strip(std::string message) {
while(!message.empty()) {
if(message[0] == ' ')
message = message.substr(1);
else if(message[message.length() - 1] == ' ')
message = message.substr(0, message.length() - 1);
else break;
}
return message;
}
inline sockaddr_in* resolveAddress(const string& host, uint16_t port) {
hostent* record = gethostbyname(host.c_str());
if (!record) {
@ -225,12 +236,24 @@ inline sockaddr_in* resolveAddress(const string& host, uint16_t port) {
return addr;
}
inline vector<string> split_hosts(const std::string& message, char delimiter) {
vector<string> result;
size_t found, index = 0;
do {
found = message.find(delimiter, index);
result.push_back(strip(message.substr(index, found - index)));
index = found + 1;
} while(index != 0);
return result;
}
bool InstanceHandler::startInstance() {
if (this->active)
return false;
active = true;
this->web_list->enabled = ts::config::server::enable_teamspeak_weblist;
string errorMessage;
this->sslMgr = new ssl::SSLManager();
if(!this->sslMgr->initialize()) {
logCritical("Failed to initialize ssl manager.");
@ -243,21 +266,32 @@ bool InstanceHandler::startInstance() {
return false;
}
//Startup file server
sockaddr_in *fAddr = resolveAddress(this->properties()[property::SERVERINSTANCE_FILETRANSFER_HOST].as<string>(), this->properties()[property::SERVERINSTANCE_FILETRANSFER_PORT].as<uint16_t>());
if (!fAddr) {
logCritical(LOG_FT, "Could not resolve file server host");
return false;
}
logMessage(LOG_FT, "Starting server on {}:{}", inet_ntoa(fAddr->sin_addr), ntohs(fAddr->sin_port));
fileServer = new ts::server::FileServer();
if (!fileServer->start(*fAddr)) {
logCritical(LOG_FT, "Failed to start file server.");
delete fAddr;
return false;
{
auto bindings_string = this->properties()[property::SERVERINSTANCE_FILETRANSFER_HOST].as<string>();
auto port = this->properties()[property::SERVERINSTANCE_FILETRANSFER_PORT].as<uint16_t>();
auto ft_bindings = net::resolve_bindings(bindings_string, port);
deque<shared_ptr<FileServer::Binding>> bindings;
for(auto& binding : ft_bindings) {
if(!get<2>(binding).empty()) {
logError(LOG_FT, "Failed to resolve binding for {}: {}", get<0>(binding), get<2>(binding));
continue;
}
auto entry = make_shared<FileServer::Binding>();
memcpy(&entry->address, &get<1>(binding), sizeof(sockaddr_storage));
entry->file_descriptor = -1;
entry->event_accept = nullptr;
bindings.push_back(entry);
}
logMessage(LOG_FT, "Starting server on {}:{}", bindings_string, port);
if(!fileServer->start(bindings, errorMessage)) {
logCritical(LOG_FT, "Failed to start server: {}", errorMessage);
return false;
}
}
delete fAddr;
if(config::query::sslMode > 0) {
string error;
@ -271,7 +305,6 @@ bool InstanceHandler::startInstance() {
}
}
string errorMessage;
queryServer = new ts::server::QueryServer(this->getSql());
{
auto server_query = queryServer->find_query_account_by_name("serveradmin");
@ -288,18 +321,32 @@ bool InstanceHandler::startInstance() {
}
}
}
sockaddr_in *qAddr = resolveAddress(this->properties()[property::SERVERINSTANCE_QUERY_HOST].as<string>(), this->properties()[property::SERVERINSTANCE_QUERY_PORT].as<uint16_t>());
if (!qAddr) {
logCritical(LOG_QUERY, "Could not resolve query server host");
return false;
{
auto query_bindings_string = this->properties()[property::SERVERINSTANCE_QUERY_HOST].as<string>();
auto query_port = this->properties()[property::SERVERINSTANCE_QUERY_PORT].as<uint16_t>();
auto query_bindings = net::resolve_bindings(query_bindings_string, query_port);
deque<shared_ptr<QueryServer::Binding>> bindings;
for(auto& binding : query_bindings) {
if(!get<2>(binding).empty()) {
logError(LOG_QUERY, "Failed to resolve binding for {}: {}", get<0>(binding), get<2>(binding));
continue;
}
auto entry = make_shared<QueryServer::Binding>();
memcpy(&entry->address, &get<1>(binding), sizeof(sockaddr_storage));
entry->file_descriptor = -1;
entry->event_accept = nullptr;
bindings.push_back(entry);
}
logMessage(LOG_QUERY, "Starting server on {}:{}", query_bindings_string, query_port);
if(!queryServer->start(bindings, errorMessage)) {
logCritical(LOG_QUERY, "Failed to start query server: {}", errorMessage);
return false;
}
}
logMessage(LOG_QUERY, "Starting server on {}:{}", inet_ntoa(qAddr->sin_addr), ntohs(qAddr->sin_port));
if (!queryServer->start(*qAddr, errorMessage)) {
logCritical(LOG_QUERY, "Could not start Query server.\nMessage: " + errorMessage);
delete qAddr;
return false;
}
delete qAddr;
#ifdef COMPILE_WEB_CLIENT
if(config::web::activated) {

View File

@ -11,9 +11,10 @@
#include <misc/base64.h>
#include "weblist/WebListManager.h"
#include "client/voice/VoiceClient.h"
#include "client/InternalClient.h"
#include "client/music/MusicClient.h"
#include "./client/web/WebClient.h"
#include "./client/voice/VoiceClient.h"
#include "./client/InternalClient.h"
#include "./client/music/MusicClient.h"
#include "music/MusicBotManager.h"
#include "server/VoiceServer.h"
#include "server/file/FileServer.h"
@ -869,7 +870,8 @@ vector<pair<ts::permission::PermissionType, ts::permission::v2::PermissionFlagge
if(found_negate) {
server_group_data.erase(remove_if(server_group_data.begin(), server_group_data.end(), [](auto data) { return !std::get<2>(data); }), server_group_data.end());
logTrace(this->serverId, "[Permission] Found negate flag within server groups. Groups left: {}", server_group_data.size());
sassert(!server_group_data.empty()); /* this should never happen! */
if(server_group_data.empty())
logTrace(this->serverId, "[Permission] After non negated groups have been kicked out the negated groups are empty! This should not happen! Permission: {}, Client ID: {}", permission_type, client_dbid);
permission::PermissionValue current_lowest = 0;
for(auto& group : server_group_data) {
if(!active_server_group || (std::get<3>(group) < current_lowest && std::get<3>(group) != -1)) {
@ -1035,9 +1037,14 @@ float TSServer::averagePing() {
float sum = 0;
this->forEachClient([&count, &sum](shared_ptr<ConnectedClient> client) {
if(client->getType() != ClientType::CLIENT_TEAMSPEAK) return;
count++;
sum += duration_cast<milliseconds>(dynamic_pointer_cast<VoiceClient>(client)->calculatePing()).count();
auto type = client->getType();
if(type == ClientType::CLIENT_TEAMSPEAK || type == ClientType::CLIENT_TEASPEAK) {
count++;
sum += duration_cast<milliseconds>(dynamic_pointer_cast<VoiceClient>(client)->calculatePing()).count();
} else if(type == ClientType::CLIENT_WEB) {
count++;
sum += duration_cast<milliseconds>(dynamic_pointer_cast<WebClient>(client)->client_ping()).count();
}
});
if(count == 0) return 0;

View File

@ -3471,9 +3471,23 @@ CommandResult ConnectedClient::handleCommandFTInitUpload(Command &cmd) {
Command result(this->getExternalType() == CLIENT_TEAMSPEAK ? "notifystartupload" : "");
result["clientftfid"] = cmd["clientftfid"].as<uint64_t>();
result["ftkey"] = key->key;
result["port"] = ntohs(serverInstance->getFileServer()->boundedAddress()->sin_port);
if(serverInstance->getFileServer()->boundedAddress()->sin_addr.s_addr != 0)
result["ip"] = inet_ntoa(serverInstance->getFileServer()->boundedAddress()->sin_addr) + string(",");
auto bindings = serverInstance->getFileServer()->list_bindings();
if(!bindings.empty()) {
result["port"] = net::port(bindings[0]->address);
string ip = "";
for(auto& entry : bindings) {
if(net::is_anybind(entry->address)) {
ip = "";
break;
}
ip += net::to_string(entry->address, false) + ",";
}
if(!ip.empty())
result["ip"] = ip;
} else {
return {findError("server_is_not_running"), "file server is not bound to any address"};
}
result["seekpos"] = 0;
result["proto"] = 1;
result["serverftfid"] = key->key_id; //TODO generate!
@ -3549,9 +3563,24 @@ CommandResult ConnectedClient::handleCommandFTInitDownload(Command &cmd) {
result["proto"] = 1;
result["serverftfid"] = key->key_id;
result["ftkey"] = key->key;
result["port"] = ntohs(serverInstance->getFileServer()->boundedAddress()->sin_port);
if(serverInstance->getFileServer()->boundedAddress()->sin_addr.s_addr != 0)
result["ip"] = inet_ntoa(serverInstance->getFileServer()->boundedAddress()->sin_addr) + string(",");
auto bindings = serverInstance->getFileServer()->list_bindings();
if(!bindings.empty()) {
result["port"] = net::port(bindings[0]->address);
string ip = "";
for(auto& entry : bindings) {
if(net::is_anybind(entry->address)) {
ip = "";
break;
}
ip += net::to_string(entry->address, false) + ",";
}
if(!ip.empty())
result["ip"] = ip;
} else {
return {findError("server_is_not_running"), "file server is not bound to any address"};
}
result["size"] = key->size;
this->sendCommand(result);

View File

@ -101,7 +101,6 @@ size_t FileClient::used_bandwidth() {
}
std::string FileClient::client_prefix() {
auto ip_address = net::to_string(this->remoteAddress.sin_addr);
bool hide_ip = config::server::disable_ip_saving;
if(!hide_ip) {
auto client = this->client;
@ -112,10 +111,13 @@ std::string FileClient::client_prefix() {
}
}
}
std::string address = "";
if(hide_ip)
ip_address = "X.X.X.X";
if(this->client) return "[" + to_string(this->client->getServerId()) + "|" + ip_address + ":" + to_string(htons(this->remoteAddress.sin_port)) + "| " + this->client->getDisplayName() + "]";
return "[0|" + ip_address + ":" + to_string(htons(this->remoteAddress.sin_port)) + "|unconnected]";
address = "X.X.X.X:" + to_string(net::port(this->remote_address));
else
address = net::to_string(this->remote_address);
if(this->client) return "[" + to_string(this->client->getServerId()) + "|" + address + "| " + this->client->getDisplayName() + "]";
return "[0|" + address + "|unconnected]";
}
size_t FileClient::transferred_bytes() {

View File

@ -89,7 +89,7 @@ namespace ts {
std::recursive_mutex bandwidth_lock;
std::deque<std::unique_ptr<BandwidthEntry>> bandwidth;
sockaddr_in remoteAddress;
sockaddr_storage remote_address;
int clientFd;
bool event_read_hold = false;

View File

@ -166,9 +166,9 @@ void FileClient::handleMessageRead(int fd, short, void *) {
}
if(this->state_connection == C_CONNECTED) {
if(this->state_transfer == T_TRANSFER)
logError(LOG_FT, "{} Transfer hang up. Remote peer closed the connection.", this->client_prefix());
logWarning(LOG_FT, "{} Transfer hang up. Remote peer closed the connection.", this->client_prefix());
else
logTrace(LOG_FT, "{} Received notification that the remote peer has closed the connection", this->client_prefix());
logMessage(LOG_FT, "{} Remote peer has closed the connection before initializing a transfer.", this->client_prefix());
self->disconnect(seconds(3));
}
return;

View File

@ -262,6 +262,18 @@ void WebClient::tick(const std::chrono::system_clock::time_point& point) {
this->ws_handler.send({pipes::PING, {buffer, 2}});
}
}
if(this->js_ping.last_request + seconds(1) < point) {
if(this->js_ping.last_response > this->js_ping.last_request || this->js_ping.last_request + this->js_ping.timeout < point) {
this->js_ping.current_id++;
this->js_ping.last_request = point;
Json::Value jsonCandidate;
jsonCandidate["type"] = "ping";
jsonCandidate["payload"] = to_string(this->js_ping.current_id);
this->sendJson(jsonCandidate);
}
}
}
void WebClient::onWSConnected() {
@ -554,6 +566,36 @@ void WebClient::handleMessage(const std::string &message) {
}
this->voice_bridge->remote_ice_finished();
}
} else if(val["type"].asString() == "ping") {
Json::Value response;
response["type"] = "pong";
response["payload"] = val["payload"];
response["ping_native"] = to_string(duration_cast<microseconds>(this->ping.value).count());
this->sendJson(response);
return;
} else if(val["type"].asString() == "pong") {
auto payload = val["payload"].isString() ? val["payload"].asString() : "";
uint8_t response_id = 0;
try {
response_id = (uint8_t) stoul(payload);
} catch(std::exception& ex) {
debugMessage(this->getServerId(), "[{}] Failed to parse pong payload.");
return;
}
if(response_id != this->js_ping.current_id) {
debugMessage(
this->getServerId(),
"{} Received pong on web socket from javascript which is older than the last request. Delay may over {}ms? (Index: {}, Current index: {})",
CLIENT_STR_LOG_PREFIX,
duration_cast<milliseconds>(this->js_ping.timeout).count(),
response_id,
this->js_ping.current_id
);
return;
}
this->js_ping.last_response = system_clock::now();
this->js_ping.value = duration_cast<nanoseconds>(this->js_ping.last_response - this->js_ping.last_request);
}
} catch (const std::exception& ex) {
logError(this->server->getServerId(), "Could not handle json packet! Message {}", ex.what());

View File

@ -32,6 +32,9 @@ namespace ts {
bool shouldReceiveVoice(const std::shared_ptr<ConnectedClient> &sender) override;
inline std::chrono::nanoseconds client_ping() { return this->client_ping_layer_7(); }
inline std::chrono::nanoseconds client_ping_layer_5() { return this->ping.value; }
inline std::chrono::nanoseconds client_ping_layer_7() { return this->js_ping.value; }
protected:
void handlePacketVoiceWhisper(const pipes::buffer_view &string, bool) override;
@ -60,6 +63,15 @@ namespace ts {
std::chrono::nanoseconds timeout{2000};
} ping;
struct {
uint8_t current_id = 0;
std::chrono::system_clock::time_point last_request;
std::chrono::system_clock::time_point last_response;
std::chrono::nanoseconds value;
std::chrono::nanoseconds timeout{2000};
} js_ping;
std::mutex queue_lock;
std::deque<pipes::buffer> queue_read;
std::deque<pipes::buffer> queue_write;
@ -93,7 +105,6 @@ namespace ts {
public:
void send_voice_packet(const pipes::buffer_view &view, const VoicePacketFlags &flags) override;
void send_voice_whisper_packet(const pipes::buffer_view &view, const VoicePacketFlags &flags) override;
protected:

View File

@ -18,6 +18,203 @@ using namespace ts::server::conversation;
namespace fs = std::experimental::filesystem;
/* Using const O3 to improve unreadability */
#if 0
/*
/* Debug */
0x555555c542a2 push rbp
0x555555c542a3 mov rbp,rsp
0x555555c542a6 mov QWORD PTR [rbp-0x28],rdi
0x555555c542aa mov QWORD PTR [rbp-0x30],rsi
0x555555c542ae mov QWORD PTR [rbp-0x38],rdx
0x555555c542b2 mov QWORD PTR [rbp-0x40],rcx
0x555555c542b6 mov rax,QWORD PTR [rbp-0x40]
0x555555c542ba mov QWORD PTR [rbp-0x20],rax
0x555555c542be mov rax,QWORD PTR [rbp-0x38]
0x555555c542c2 mov QWORD PTR [rbp-0x18],rax
0x555555c542c6 mov rax,QWORD PTR [rbp-0x28]
0x555555c542ca mov QWORD PTR [rbp-0x10],rax
0x555555c542ce mov rax,QWORD PTR [rbp-0x30]
0x555555c542d2 mov QWORD PTR [rbp-0x8],rax
/* first loop */
0x555555c542d6 cmp QWORD PTR [rbp-0x18],0x7
0x555555c542db jbe 0x555555c5431e <apply_crypt(void*, void*, unsigned long, unsigned long)+124>
0x555555c542dd mov rax,QWORD PTR [rbp-0x18]
0x555555c542e1 and eax,0x7
0x555555c542e4 mov rdx,QWORD PTR [rbp-0x20]
0x555555c542e8 mov ecx,eax
0x555555c542ea shl rdx,cl
0x555555c542ed mov rax,rdx
0x555555c542f0 xor rax,QWORD PTR [rbp-0x18]
0x555555c542f4 xor QWORD PTR [rbp-0x20],rax
0x555555c542f8 mov rax,QWORD PTR [rbp-0x10]
0x555555c542fc mov rax,QWORD PTR [rax]
0x555555c542ff xor rax,QWORD PTR [rbp-0x20]
0x555555c54303 mov rdx,rax
0x555555c54306 mov rax,QWORD PTR [rbp-0x8]
0x555555c5430a mov QWORD PTR [rax],rdx
0x555555c5430d add QWORD PTR [rbp-0x8],0x8
0x555555c54312 add QWORD PTR [rbp-0x10],0x8
0x555555c54317 sub QWORD PTR [rbp-0x18],0x8
0x555555c5431c jmp 0x555555c542d6 /* first loop */
/* Second loop */
0x555555c5431e cmp QWORD PTR [rbp-0x18],0x0
0x555555c54323 je 0x555555c54364 <apply_crypt(void*, void*, unsigned long, unsigned long)+194>
0x555555c54325 mov rax,QWORD PTR [rbp-0x18]
0x555555c54329 and eax,0x7
0x555555c5432c mov rdx,QWORD PTR [rbp-0x20]
0x555555c54330 mov ecx,eax
0x555555c54332 shl rdx,cl
0x555555c54335 mov rax,rdx
0x555555c54338 xor rax,QWORD PTR [rbp-0x18]
0x555555c5433c xor QWORD PTR [rbp-0x20],rax
0x555555c54340 mov rax,QWORD PTR [rbp-0x10]
0x555555c54344 movzx edx,BYTE PTR [rax]
0x555555c54347 mov rax,QWORD PTR [rbp-0x20]
0x555555c5434b xor edx,eax
0x555555c5434d mov rax,QWORD PTR [rbp-0x8]
0x555555c54351 mov BYTE PTR [rax],dl
0x555555c54353 sub QWORD PTR [rbp-0x18],0x1
0x555555c54358 add QWORD PTR [rbp-0x10],0x1
0x555555c5435d add QWORD PTR [rbp-0x8],0x1
0x555555c54362 jmp 0x555555c5431e /* second loop */
0x555555c54364 nop
0x555555c54365 pop rbp
0x555555c54366 ret
/* O3 */
0x555555c41d8f push rbp
0x555555c41d90 cmp rdx,0x7
0x555555c41d94 mov r8,rcx
0x555555c41d97 mov rbp,rsp
0x555555c41d9a push r14
0x555555c41d9c push rbx
0x555555c41d9d jbe 0x555555c41df8 <apply_crypt(void*, void*, unsigned long, unsigned long)+105>
0x555555c41d9f lea rbx,[rdx-0x8]
0x555555c41da3 mov r10,rsi
0x555555c41da6 mov r9,rdi
0x555555c41da9 mov rax,rdx
0x555555c41dac mov r11,rbx
0x555555c41daf and r11d,0x7
0x555555c41db3 mov ecx,eax
0x555555c41db5 mov r14,r8
0x555555c41db8 add r10,0x8
0x555555c41dbc and ecx,0x7
0x555555c41dbf add r9,0x8
0x555555c41dc3 shl r14,cl
0x555555c41dc6 mov rcx,r14
0x555555c41dc9 xor rcx,rax
0x555555c41dcc sub rax,0x8
0x555555c41dd0 xor r8,rcx
0x555555c41dd3 mov rcx,QWORD PTR [r9-0x8]
0x555555c41dd7 xor rcx,r8
0x555555c41dda mov QWORD PTR [r10-0x8],rcx
0x555555c41dde cmp rax,r11
0x555555c41de1 jne 0x555555c41db3 <apply_crypt(void*, void*, unsigned long, unsigned long)+36>
0x555555c41de3 shr rbx,0x3
0x555555c41de7 and edx,0x7
0x555555c41dea lea rax,[rbx*8+0x8]
0x555555c41df2 add rdi,rax
0x555555c41df5 add rsi,rax
0x555555c41df8 test rdx,rdx
0x555555c41dfb je 0x555555c41ed3 <apply_crypt(void*, void*, unsigned long, unsigned long)+324>
0x555555c41e01 mov rax,r8
0x555555c41e04 mov ecx,edx
0x555555c41e06 xor r8,rdx
0x555555c41e09 shl rax,cl
0x555555c41e0c mov rcx,rdx
0x555555c41e0f xor r8,rax
0x555555c41e12 movzx eax,BYTE PTR [rdi]
0x555555c41e15 xor eax,r8d
0x555555c41e18 sub rcx,0x1
0x555555c41e1c mov BYTE PTR [rsi],al
0x555555c41e1e je 0x555555c41ed3 <apply_crypt(void*, void*, unsigned long, unsigned long)+324>
0x555555c41e24 mov rax,r8
0x555555c41e27 xor r8,rcx
0x555555c41e2a shl rax,cl
0x555555c41e2d mov rcx,rdx
0x555555c41e30 xor r8,rax
0x555555c41e33 movzx eax,BYTE PTR [rdi+0x1]
0x555555c41e37 xor eax,r8d
0x555555c41e3a sub rcx,0x2
0x555555c41e3e mov BYTE PTR [rsi+0x1],al
0x555555c41e41 je 0x555555c41ed3 <apply_crypt(void*, void*, unsigned long, unsigned long)+324>
0x555555c41e47 mov rax,r8
0x555555c41e4a xor r8,rcx
0x555555c41e4d shl rax,cl
0x555555c41e50 mov rcx,rdx
0x555555c41e53 xor r8,rax
0x555555c41e56 movzx eax,BYTE PTR [rdi+0x2]
0x555555c41e5a xor eax,r8d
0x555555c41e5d sub rcx,0x3
0x555555c41e61 mov BYTE PTR [rsi+0x2],al
0x555555c41e64 je 0x555555c41ed3 <apply_crypt(void*, void*, unsigned long, unsigned long)+324>
0x555555c41e66 mov rax,r8
0x555555c41e69 xor r8,rcx
0x555555c41e6c shl rax,cl
0x555555c41e6f mov rcx,rdx
0x555555c41e72 xor r8,rax
0x555555c41e75 movzx eax,BYTE PTR [rdi+0x3]
0x555555c41e79 xor eax,r8d
0x555555c41e7c sub rcx,0x4
0x555555c41e80 mov BYTE PTR [rsi+0x3],al
0x555555c41e83 je 0x555555c41ed3 <apply_crypt(void*, void*, unsigned long, unsigned long)+324>
0x555555c41e85 mov rax,r8
0x555555c41e88 xor r8,rcx
0x555555c41e8b shl rax,cl
0x555555c41e8e mov rcx,rdx
0x555555c41e91 xor r8,rax
0x555555c41e94 movzx eax,BYTE PTR [rdi+0x4]
0x555555c41e98 xor eax,r8d
0x555555c41e9b sub rcx,0x5
0x555555c41e9f mov BYTE PTR [rsi+0x4],al
0x555555c41ea2 je 0x555555c41ed3 <apply_crypt(void*, void*, unsigned long, unsigned long)+324>
0x555555c41ea4 mov rax,r8
0x555555c41ea7 xor r8,rcx
0x555555c41eaa shl rax,cl
0x555555c41ead xor r8,rax
0x555555c41eb0 movzx eax,BYTE PTR [rdi+0x5]
0x555555c41eb4 xor eax,r8d
0x555555c41eb7 cmp rdx,0x6
0x555555c41ebb mov BYTE PTR [rsi+0x5],al
0x555555c41ebe je 0x555555c41ed3 <apply_crypt(void*, void*, unsigned long, unsigned long)+324>
0x555555c41ec0 lea rax,[r8+r8*1]
0x555555c41ec4 xor r8,0x1
0x555555c41ec8 xor r8,rax
0x555555c41ecb xor r8b,BYTE PTR [rdi+0x6]
0x555555c41ecf mov BYTE PTR [rsi+0x6],r8b
0x555555c41ed3 pop rbx
0x555555c41ed4 pop r14
0x555555c41ed6 pop rbp
0x555555c41ed7 ret
*/
#endif
__attribute__((optimize("-O3"), always_inline)) void apply_crypt(void* source, void* target, size_t length, uint64_t base_key) {
uint64_t crypt_key = base_key;
size_t length_left = length;
auto source_ptr = (uint8_t*) source;
auto dest_ptr = (uint8_t*) target;
while(length_left >= 8) {
crypt_key ^= (crypt_key << (length_left & 0x7U)) ^ length_left;
*(uint64_t*) dest_ptr = *(uint64_t*) source_ptr ^ crypt_key;
dest_ptr += 8;
source_ptr += 8;
length_left -= 8;
}
while(length_left > 0) {
crypt_key ^= (crypt_key << (length_left & 0x7U)) ^ length_left;
*dest_ptr = *source_ptr ^ (uint8_t) crypt_key;
length_left--;
source_ptr++;
dest_ptr++;
}
}
Conversation::Conversation(const std::shared_ptr<ts::server::conversation::ConversationManager> &handle, ts::ChannelId channel_id, const std::string& file) : _ref_handle(handle), _channel_id(channel_id), file_name(file) { }
Conversation::~Conversation() {
@ -50,6 +247,7 @@ bool Conversation::initialize(std::string& error) {
this->file_handle = fopen(this->file_name.c_str(), fs::exists(file) ? "r+" : "w+");
if(!this->file_handle) {
this->_volatile = true;
error = "failed to open file";
return false;
}
@ -197,6 +395,11 @@ bool Conversation::initialize(std::string& error) {
else
this->_last_message_timestamp = system_clock::time_point{};
}
/* close the file handle because we've passed our checks */
{
fclose(this->file_handle);
this->file_handle = nullptr;
}
return true;
}
@ -214,14 +417,59 @@ void Conversation::finalize() {
}
void Conversation::cleanup_cache() {
//FIXME: Implement this shit here!
auto ref_handle = this->ref_handle();
if(!ref_handle)
return;
auto ref_server = ref_handle->ref_server();
if(!ref_server)
return;
{
lock_guard block(this->message_block_lock);
for(auto& block : this->message_blocks) {
block->block_header = nullptr;
block->indexed_block = nullptr;
}
}
{
lock_guard file_lock(this->file_handle_lock);
if(this->last_access + minutes(5) < system_clock::now()) {
if(this->file_handle) {
fclose(this->file_handle);
this->file_handle = nullptr;
debugMessage(ref_server->getServerId(), "[Conversations][{}] Closing file handle due to inactivity.", this->_channel_id);
}
}
}
}
ssize_t Conversation::fread(void *target, size_t length, ssize_t index) {
bool Conversation::setup_file() {
this->file_handle = fopen(this->file_name.c_str(), fs::exists(this->file_name) ? "r+" : "w+");
if(!this->file_handle) {
auto ref_handle = this->ref_handle();
if(!ref_handle)
return false;
auto ref_server = ref_handle->ref_server();
if(!ref_server)
return false;
logError(ref_server->getServerId(), "[Conversations][{}] Failed to open closed file handle. ({} | {})", errno, strerror(errno));
return false;
}
setbuf(this->file_handle, nullptr); /* we're doing random access (a buffer is useless here) */
return true;
}
ssize_t Conversation::fread(void *target, size_t length, ssize_t index, bool acquire_lock) {
if(length == 0)
return 0;
lock_guard file_lock(this->file_handle_lock);
unique_lock file_lock(this->file_handle_lock, defer_lock);
if(acquire_lock)
file_lock.lock();
this->last_access = system_clock::now();
if(!this->file_handle && !this->setup_file())
return -3;
if(index >= 0) {
auto result = fseek(this->file_handle, index, SEEK_SET);
if(result < 0)
@ -238,12 +486,17 @@ ssize_t Conversation::fread(void *target, size_t length, ssize_t index) {
return total_read;
}
ssize_t Conversation::fwrite(void *target, size_t length, ssize_t index, bool extend_file) {
ssize_t Conversation::fwrite(void *target, size_t length, ssize_t index, bool extend_file, bool acquire_lock) {
if(length == 0)
return 0;
unique_lock file_lock(this->file_handle_lock, defer_lock);
if(acquire_lock)
file_lock.lock();
extend_file = false; /* fseek does the job good ad well */
lock_guard file_lock(this->file_handle_lock);
if(!this->file_handle && !this->setup_file())
return -3;
this->last_access = system_clock::now();
if(index >= 0) {
auto result = extend_file ? lseek(fileno(this->file_handle), index, SEEK_SET) : fseek(this->file_handle, index, SEEK_SET);
if(result < 0)
@ -265,7 +518,7 @@ bool Conversation::load_message_block_header(const std::shared_ptr<ts::server::c
return true;
auto block_header = make_unique<fio::BlockHeader>();
if(this->fread(&*block_header, sizeof(*block_header), block->block_offset) != sizeof(*block_header)) {
if(this->fread(&*block_header, sizeof(*block_header), block->block_offset, true) != sizeof(*block_header)) {
error = "failed to read block header";
return false;
}
@ -311,7 +564,7 @@ bool Conversation::load_message_block_index(const std::shared_ptr<ts::server::co
fio::MessageHeader header{};
while(offset < max_offset) {
if(this->fread(&header, sizeof(header), offset) != sizeof(header)) {
if(this->fread(&header, sizeof(header), offset, true) != sizeof(header)) {
error = "failed to read message header at index" + to_string(offset);
return false;
}
@ -344,6 +597,12 @@ bool Conversation::load_messages(const std::shared_ptr<db::MessageBlock> &block,
if(index >= indexed_block->message_index.size())
return true;
unique_lock file_lock(this->file_handle_lock);
if(!this->file_handle && !this->setup_file()) {
error = "failed to open file handle";
return false;
}
auto result = fseek(this->file_handle, block->block_offset + indexed_block->message_index[index].offset, SEEK_SET);
if(result == EINVAL) {
error = "failed to seek to begin of an indexed block read";
@ -357,7 +616,7 @@ bool Conversation::load_messages(const std::shared_ptr<db::MessageBlock> &block,
}
auto data = make_shared<fio::IndexedMessageData>();
if(this->fread(&data->header, sizeof(data->header), -1) != sizeof(data->header)) {
if(this->fread(&data->header, sizeof(data->header), -1, false) != sizeof(data->header)) {
error = "failed to read message header at index " + to_string(index);
return false;
}
@ -367,42 +626,44 @@ bool Conversation::load_messages(const std::shared_ptr<db::MessageBlock> &block,
return false;
}
data->sender_unique_id.resize(data->header.sender_unique_id_length);
data->sender_name.resize(data->header.sender_name_length);
if(header->meta_encrypted) {
auto meta_size = data->header.sender_unique_id_length + data->header.sender_name_length;
auto meta_buffer = malloc(meta_size);
if(this->fread(meta_buffer, meta_size, -1, false) != meta_size) {
error = "failed to read message metadata at " + to_string(index);
free(meta_buffer);
return false;
}
apply_crypt(meta_buffer, meta_buffer, meta_size, (block->block_offset ^ data->header.message_timestamp) ^ 0x6675636b20796f75ULL); /* 0x6675636b20796f75 := 'fuck you' */
data->sender_unique_id.assign((char*) meta_buffer, data->header.sender_unique_id_length);
data->sender_name.assign((char*) meta_buffer + data->header.sender_unique_id_length, data->header.sender_name_length);
free(meta_buffer);
} else {
data->sender_unique_id.resize(data->header.sender_unique_id_length);
data->sender_name.resize(data->header.sender_name_length);
if(this->fread(data->sender_unique_id.data(), data->sender_unique_id.length(), -1, false) != data->sender_unique_id.length()) {
error = "failed to read message sender unique id at " + to_string(index);
return false;
}
if(this->fread(data->sender_name.data(), data->sender_name.length(), -1, false) != data->sender_name.length()) {
error = "failed to read message sender name id at " + to_string(index);
return false;
}
}
data->message.resize(data->header.message_length);
if(this->fread(data->sender_unique_id.data(), data->sender_unique_id.length(), -1) != data->sender_unique_id.length()) {
error = "failed to read message sender unique id at " + to_string(index);
return false;
}
if(this->fread(data->sender_name.data(), data->sender_name.length(), -1) != data->sender_name.length()) {
error = "failed to read message sender name id at " + to_string(index);
return false;
}
if(this->fread(data->message.data(), data->message.length(), -1) != data->message.length()) {
if(this->fread(data->message.data(), data->message.length(), -1, false) != data->message.length()) {
error = "failed to read message id at " + to_string(index);
return false;
}
if(header->message_encrypted) {
uint64_t crypt_key = block->block_offset ^ data->header.message_timestamp;
size_t length_left = data->message.size();
auto ptr = (char*) data->message.data();
while(length_left >= 8) {
crypt_key ^= (crypt_key << (length_left & 0x7)) ^ length_left;
*(uint64_t*) ptr ^= crypt_key;
ptr += 8;
length_left -= 8;
}
while(length_left > 0) {
crypt_key ^= (crypt_key << (length_left & 0x7)) ^ length_left;
*ptr ^= (uint8_t) crypt_key;
length_left--;
ptr++;
}
}
if(header->message_encrypted)
apply_crypt(data->message.data(), data->message.data(), data->message.size(), block->block_offset ^ data->header.message_timestamp);
message_data.message_data = data;
index++;
@ -452,7 +713,7 @@ void Conversation::finish_block(const std::shared_ptr<ts::server::conversation::
}
bool Conversation::write_block_header(const std::shared_ptr<fio::BlockHeader> &header, size_t index, std::string &error) {
auto code = this->fwrite(&*header, sizeof(fio::BlockHeader), index, false);
auto code = this->fwrite(&*header, sizeof(fio::BlockHeader), index, false, true);
if(code == sizeof(fio::BlockHeader))
return true;
error = "write returned " + to_string(code);
@ -527,6 +788,10 @@ void Conversation::process_write_queue(const std::chrono::system_clock::time_poi
//TODO: Find "free" blocks and use them! (But do not use indirectly finished blocks, their max size could be invalid)
unique_lock file_lock(this->file_handle_lock);
if(!this->file_handle && !this->setup_file()) {
logError(ref_server->getServerId(), "[Conversations][{}] Failed to reopen log file. Dropping message!", this->_channel_id);
return;
}
auto result = fseek(this->file_handle, 0, SEEK_END);
if(result != 0) {
logError(ref_server->getServerId(), "[Conversations][{}] failed to seek to the end (" + to_string(result) + " " + to_string(errno) + "). Could not create new block. Dropping message!", this->_channel_id);
@ -553,7 +818,8 @@ void Conversation::process_write_queue(const std::chrono::system_clock::time_poi
block_header->first_message_timestamp = (uint64_t) duration_cast<milliseconds>(write_entry->message_timestamp.time_since_epoch()).count();
block_header->block_size = sizeof(fio::BlockHeader);
//block_header->message_encrypted = true; /* May add some kind of hidden debug option? */
block_header->message_encrypted = true; /* May add some kind of hidden debug option? */
block_header->meta_encrypted = true; /* May add some kind of hidden debug option? */
this->last_message_block->block_header = block_header;
}
@ -563,61 +829,52 @@ void Conversation::process_write_queue(const std::chrono::system_clock::time_poi
block_header->last_message_timestamp = write_header.message_timestamp;
/* first write the header */
if(this->fwrite(&write_header, sizeof(write_header), entry_offset, true) != sizeof(write_header)) {
if(this->fwrite(&write_header, sizeof(write_header), entry_offset, true, true) != sizeof(write_header)) {
logError(ref_server->getServerId(), "[Conversations][{}] Failed to write message header. Dropping message!", this->_channel_id);
return;
}
entry_offset += sizeof(write_header);
/* then write the sender unique id */
if(this->fwrite(write_entry->sender_unique_id.data(), write_header.sender_unique_id_length, entry_offset, true) != write_header.sender_unique_id_length) {
logError(ref_server->getServerId(), "[Conversations][{}] Failed to write message sender unique id. Dropping message!", this->_channel_id);
return;
}
entry_offset += write_header.sender_unique_id_length;
/* write the metadata */
{
auto write_buffer_size = write_header.sender_unique_id_length + write_header.sender_name_length;
auto write_buffer = malloc(write_buffer_size);
/* then write the sender name */
if(this->fwrite(write_entry->sender_name.data(), write_header.sender_name_length, entry_offset, true) != write_header.sender_name_length) {
logError(ref_server->getServerId(), "[Conversations][{}] Failed to write message sender name. Dropping message!", this->_channel_id);
return;
memcpy(write_buffer, write_entry->sender_unique_id.data(), write_header.sender_unique_id_length);
memcpy((char*) write_buffer + write_header.sender_unique_id_length, write_entry->sender_name.data(), write_header.sender_name_length);
if(block_header->meta_encrypted)
apply_crypt(write_buffer, write_buffer, write_buffer_size, (this->last_message_block->block_offset ^ write_header.message_timestamp) ^ 0x6675636b20796f75ULL); /* 0x6675636b20796f75 := 'fuck you' */
/* then write the sender unique id */
if(this->fwrite(write_buffer, write_buffer_size, entry_offset, true, true) != write_buffer_size) {
logError(ref_server->getServerId(), "[Conversations][{}] Failed to write message header. Dropping message!", this->_channel_id);
free(write_buffer);
return;
}
free(write_buffer);
entry_offset += write_buffer_size;
}
entry_offset += write_header.sender_name_length;
/* then write the message */
bool message_result;
if(block_header->message_encrypted) {
uint64_t crypt_key = this->last_message_block->block_offset ^ write_header.message_timestamp;
size_t length_left = write_entry->message.size();
auto ptr = (char*) write_entry->message.data();
char* target_buffer = (char*) malloc(length_left);
char* target_buffer_ptr = target_buffer;
assert(target_buffer);
{
bool message_result;
if(block_header->message_encrypted) {
size_t length = write_entry->message.size();
char* target_buffer = (char*) malloc(length);
apply_crypt(write_entry->message.data(), target_buffer, length, this->last_message_block->block_offset ^ write_header.message_timestamp);
while(length_left >= 8) {
crypt_key ^= (crypt_key << (length_left & 0x7)) ^ length_left;
*(uint64_t*) target_buffer_ptr = crypt_key;
ptr += 8;
target_buffer_ptr += 8;
length_left -= 8;
message_result = this->fwrite(target_buffer, write_header.message_length, entry_offset, true, true) == write_header.message_length;
free(target_buffer);
} else {
message_result = this->fwrite(write_entry->message.data(), write_header.message_length, entry_offset, true, true) == write_header.message_length;
}
while(length_left > 0) {
crypt_key ^= (crypt_key << (length_left & 0x7)) ^ length_left;
*target_buffer_ptr = *ptr ^ (uint8_t) crypt_key;
length_left--;
ptr++;
target_buffer_ptr++;
if(!message_result) {
logError(ref_server->getServerId(), "[Conversations][{}] Failed to write message. Dropping message!", this->_channel_id);
return;
}
message_result = this->fwrite(target_buffer, write_header.message_length, entry_offset, true) == write_header.message_length;
free(target_buffer);
} else {
message_result = this->fwrite(write_entry->message.data(), write_header.message_length, entry_offset, true) == write_header.message_length;
entry_offset += write_header.message_length;
}
if(!message_result) {
logError(ref_server->getServerId(), "[Conversations][{}] Failed to write message. Dropping message!", this->_channel_id);
return;
}
entry_offset += write_header.message_length;
block_header->last_message_offset = (uint32_t) (entry_offset - this->last_message_block->block_offset - sizeof(fio::BlockHeader));
block_header->block_size += write_header.total_length;
@ -751,6 +1008,14 @@ std::deque<std::shared_ptr<ConversationEntry>> Conversation::message_history(con
}
if(!this->volatile_only()) {
auto handle = this->_ref_handle.lock();
if(!handle)
return result;
auto ref_server = handle->ref_server();
if(!ref_server)
return result;
auto timestamp = result.empty() ? end_timestamp : result.back()->message_timestamp;
unique_lock lock(this->message_block_lock);
@ -776,12 +1041,12 @@ std::deque<std::shared_ptr<ConversationEntry>> Conversation::message_history(con
auto block = *_rit;
/* lets search for messages */
if(!this->load_message_block_index(block, error)) {
//TODO: Log error
logWarning(ref_server->getServerId(), "[Conversations][{}] Failed to load message block {} for message lookup: {}", this->_channel_id, block->block_offset, error);
continue;
}
auto index = (*_rit)->indexed_block;
if(!index) {
//TODO Log error
logWarning(ref_server->getServerId(), "[Conversations][{}] Failed to reference indexed block within message block.", this->_channel_id);
continue;
}
@ -803,7 +1068,7 @@ std::deque<std::shared_ptr<ConversationEntry>> Conversation::message_history(con
if(!this->load_messages(block, 0, std::distance(index->message_index.begin(), rmid) + 1, error)) {
//TODO: Log error
logWarning(ref_server->getServerId(), "[Conversations][{}] Failed to load messages within block {} for message lookup: {}", this->_channel_id, block->block_offset, error);
continue;
}
do {
@ -813,6 +1078,8 @@ std::deque<std::shared_ptr<ConversationEntry>> Conversation::message_history(con
if(begin_timestamp.time_since_epoch().count() != 0 && rmid->timestamp < begin_timestamp)
return result;
if(rmid->timestamp >= timestamp)
continue; /* for some reason we got a message from the index of before where we are. This could happen for "orphaned" blocks which point to a valid block within the future block */
/*
std::chrono::system_clock::time_point message_timestamp;
@ -831,6 +1098,7 @@ std::deque<std::shared_ptr<ConversationEntry>> Conversation::message_history(con
data->message
}));
timestamp = rmid->timestamp;
if(--message_count == 0)
return result;
} while(rmid-- != index->message_index.begin());

View File

@ -143,7 +143,7 @@ namespace ts {
inline ChannelId channel_id() { return this->_channel_id; }
/* if for some reason we're not able to open the file then we're in volatile mode */
inline bool volatile_only() { return !this->file_handle; }
inline bool volatile_only() { return this->_volatile; }
void cleanup_cache();
//void set_history_length(ssize_t /* save length */);
@ -167,8 +167,10 @@ namespace ts {
ts_always_inline std::shared_ptr<ConversationManager> ref_handle() {
return this->_ref_handle.lock();
}
inline ssize_t fread(void* target, size_t length, ssize_t index);
inline ssize_t fwrite(void* target, size_t length, ssize_t index, bool extend_file);
inline bool setup_file();
inline ssize_t fread(void* target, size_t length, ssize_t index, bool acquire_handle);
inline ssize_t fwrite(void* target, size_t length, ssize_t index, bool extend_file, bool acquire_handle);
/* block db functions */
void db_save_block(const std::shared_ptr<db::MessageBlock>& /* block */);
@ -204,9 +206,12 @@ namespace ts {
/* basic file stuff */
std::string file_name;
std::mutex file_handle_lock;
std::chrono::system_clock::time_point last_access;
FILE* file_handle = nullptr;
ChannelId _channel_id;
bool _volatile = false;
std::chrono::system_clock::time_point _last_message_timestamp;
};

View File

@ -36,12 +36,12 @@ QueryServer::~QueryServer() {
void QueryServer::unregisterConnection(const shared_ptr<QueryClient> &client) {
{
threads::MutexLock lock(this->clientLock);
lock_guard lock(this->connected_clients_lock);
auto found = std::find(this->connectedClients.begin(), this->connectedClients.end(), client);
if(found != this->connectedClients.end())
this->connectedClients.erase(found);
else
logError(LOG_QUERY, "Attempted to unregister an invalid query connection!");
logError(LOG_QUERY, "Attempted to unregister an invalid connection!");
}
if(client->server) {
@ -52,80 +52,111 @@ void QueryServer::unregisterConnection(const shared_ptr<QueryClient> &client) {
/* client->handle = nullptr; */
}
bool QueryServer::start(const sockaddr_in& localAdress, std::string& errorMessage) {
if(this->running()) return false;
this->active = true;
boundAddress = new sockaddr_in;
memcpy(boundAddress, &localAdress, sizeof(localAdress));
bool QueryServer::start(const deque<shared_ptr<QueryServer::Binding>> &bindings, std::string &error) {
if(this->active) {
error = "already started";
return false;
}
this->active = true;
ip_blacklist.reset(new IpListManager("query_ip_blacklist.txt", {"#A new line separated address blacklist", "#", "#For example if we dont want google:", "8.8.8.8"}));
ip_whitelist.reset(new IpListManager("query_ip_whitelist.txt", {"#A new line separated address whitelist", "#Every ip have no flood and login attempt limit!", "127.0.0.1/8", "::1"}));
string error;
if(!this->ip_blacklist->reload(error)) logError(LOG_QUERY, "Failed to load query blacklist: {}", error);
if(!this->ip_whitelist->reload(error)) logError(LOG_QUERY, "Failed to load query whitelist: {}", error);
this->serverSocket = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);
if (serverSocket < 0) {
logCritical("Cant create server socket for file server");
return false;
}
/* load ip black/whitelist */
{
ip_blacklist.reset(new IpListManager("query_ip_blacklist.txt", {"#A new line separated address blacklist", "#", "#For example if we dont want google:", "8.8.8.8"}));
ip_whitelist.reset(new IpListManager("query_ip_whitelist.txt", {"#A new line separated address whitelist", "#Every ip have no flood and login attempt limit!", "127.0.0.1/8", "::1"}));
string error;
if(!this->ip_blacklist->reload(error)) logError(LOG_QUERY, "Failed to load query blacklist: {}", error);
if(!this->ip_whitelist->reload(error)) logError(LOG_QUERY, "Failed to load query whitelist: {}", error);
}
int enable = 1;
int disabled = 0;
if (setsockopt(serverSocket, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(int)) < 0)
logError("setsockopt(SO_REUSEADDR) failed");
if(setsockopt(serverSocket, IPPROTO_TCP, TCP_NOPUSH, &disabled, sizeof disabled) < 0)
logError("Cant disable nopush! Error: "+to_string(errno)+" / "+strerror(errno));
setsockopt(serverSocket, SOL_SOCKET, SO_KEEPALIVE, &enable, sizeof(enable));
/* reserve backup file descriptor in case that the max file descriptors have been reached */
{
this->server_reserve_fd = dup(1);
if(this->server_reserve_fd < 0)
logWarning(LOG_QUERY, "Failed to reserve a backup accept file descriptor. ({} | {})", errno, strerror(errno));
}
if(fcntl(serverSocket, F_SETFD, FD_CLOEXEC) < 0)
logError(LOG_QUERY, "Failed to enable FD_CLOEXEC for {} (QueryServer)", serverSocket);
/* setup event bases */
{
this->eventLoop = event_base_new();
this->ioThread = new threads::Thread(THREAD_SAVE_OPERATIONS | THREAD_EXECUTE_LATER, [&]{
while(this->active) {
debugMessage(LOG_QUERY, "Entering event loop ({})", (void*) this->eventLoop);
event_base_loop(this->eventLoop, EVLOOP_NO_EXIT_ON_EMPTY);
if(this->active) {
debugMessage(LOG_QUERY, "Event loop exited ({}). No active events. Sleeping 1 seconds", (void*) this->eventLoop);
this_thread::sleep_for(seconds(1));
} else {
debugMessage(LOG_QUERY, "Event loop exited ({})", (void*) this->eventLoop);
}
}
});
this->ioThread->name("EVENT Query").execute();
}
if (bind(serverSocket, (struct sockaddr *) &localAdress, sizeof(localAdress)) < 0) {
errorMessage = string() + "Cant bind server socket (" + strerror(errno) + ")";
return false;
}
if(listen(serverSocket, 255) < 0){
errorMessage = string() + "Cant listen on server socket (" + strerror(errno) + ")";
return false;
}
for(auto& binding : bindings) {
binding->file_descriptor = socket(binding->address.ss_family, SOCK_STREAM | SOCK_NONBLOCK, 0);
if(binding->file_descriptor < 0) {
logError(LOG_QUERY, "Failed to bind server to {}. (Failed to create socket: {} | {})", binding->as_string(), errno, strerror(errno));
continue;
}
this->eventLoop = event_base_new();
this->acceptEvent = event_new(this->eventLoop, this->serverSocket, EV_READ | EV_PERSIST, [](int a, short b, void* c){ ((QueryServer*) c)->onClientAccept(a, b, c); }, this);
event_add(this->acceptEvent, nullptr);
int enable = 1, disabled = 0;
this->ioThread = new threads::Thread(THREAD_SAVE_OPERATIONS | THREAD_EXECUTE_LATER, [&](){
debugMessage(LOG_QUERY, "Event base executed ({})", (void*) this->eventLoop);
event_base_dispatch(this->eventLoop);
debugMessage(LOG_QUERY, "Event base terminated ({})", (void*) this->eventLoop);
});
this->ioThread->name("EVENT Query").execute();
if (setsockopt(binding->file_descriptor, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(int)) < 0)
logWarning(LOG_QUERY, "Failed to activate SO_REUSEADDR for binding {} ({} | {})", binding->as_string(), errno, strerror(errno));
if(setsockopt(binding->file_descriptor, IPPROTO_TCP, TCP_NOPUSH, &disabled, sizeof disabled) < 0)
logWarning(LOG_QUERY, "Failed to deactivate TCP_NOPUSH for binding {} ({} | {})", binding->as_string(), errno, strerror(errno));
if(binding->address.ss_family == AF_INET6) {
if(setsockopt(binding->file_descriptor, IPPROTO_IPV6, IPV6_V6ONLY, &enable, sizeof(int)) < 0)
logWarning(LOG_QUERY, "Failed to activate IPV6_V6ONLY for IPv6 binding {} ({} | {})", binding->as_string(), errno, strerror(errno));
}
if(fcntl(binding->file_descriptor, F_SETFD, FD_CLOEXEC) < 0)
logWarning(LOG_QUERY, "Failed to set flag FD_CLOEXEC for binding {} ({} | {})", binding->as_string(), errno, strerror(errno));
this->tickingId = serverInstance->scheduler()->schedule("query", bind(&QueryServer::tick, this), seconds(1));
return true;
if (bind(binding->file_descriptor, (struct sockaddr *) &binding->address, sizeof(binding->address)) < 0) {
logError(LOG_QUERY, "Failed to bind server to {}. (Failed to bind socket: {} | {})", binding->as_string(), errno, strerror(errno));
close(binding->file_descriptor);
continue;
}
if (listen(binding->file_descriptor, SOMAXCONN) < 0) {
logError(LOG_QUERY, "Failed to bind server to {}. (Failed to listen: {} | {})", binding->as_string(), errno, strerror(errno));
close(binding->file_descriptor);
continue;
}
binding->event_accept = event_new(this->eventLoop, binding->file_descriptor, EV_READ | EV_PERSIST, [](int a, short b, void* c){ ((QueryServer *) c)->on_client_receive(a, b, c); }, this);
event_add(binding->event_accept, nullptr);
this->bindings.push_back(binding);
}
if(this->bindings.empty()) {
this->stop();
error = "failed to bind to any address";
return false;
}
this->tickingId = serverInstance->scheduler()->schedule("query", bind(&QueryServer::tick, this), seconds(1));
return true;
}
void QueryServer::stop() {
if(!this->running()) return;
if(!this->running())
return;
active = false;
serverInstance->scheduler()->cancelTask("query");
this->clientLock.lock();
auto clList = this->connectedClients;
this->clientLock.unlock();
this->connected_clients_lock.lock();
auto connected_clients = this->connectedClients;
this->connected_clients_lock.unlock();
Command cmd("serverstop");
cmd["stopped"] = true;
for(const auto &client : clList){
for(const auto &client : connected_clients){
client->sendCommand(cmd);
client->disconnect("server stopped");
}
if(this->acceptEvent){
event_del(this->acceptEvent);
event_free(this->acceptEvent);
this->acceptEvent = nullptr;
}
{
auto now = system_clock::now();
while(!this->connectedClients.empty()) {
@ -144,6 +175,22 @@ void QueryServer::stop() {
}
this->threads.clear();
for(auto& binding : this->bindings) {
if(binding->event_accept) {
event_del_block(binding->event_accept);
event_free(binding->event_accept);
binding->event_accept = nullptr;
}
if(binding->file_descriptor > 0) {
if(shutdown(binding->file_descriptor, SHUT_RDWR) < 0)
logWarning(LOG_QUERY, "Failed to shutdown socket for binding {} ({} | {}).", binding->as_string(), errno, strerror(errno));
if(close(binding->file_descriptor) < 0)
logError(LOG_QUERY, "Failed to close socket for binding {} ({} | {}).", binding->as_string(), errno, strerror(errno));
binding->file_descriptor = -1;
}
}
this->bindings.clear();
if(this->eventLoop)
event_base_loopexit(this->eventLoop, nullptr);
if(this->ioThread) {
@ -160,41 +207,198 @@ void QueryServer::stop() {
this->eventLoop = nullptr;
}
delete this->boundAddress;
this->boundAddress = nullptr;
if(this->serverSocket > 0) {
if(shutdown(this->serverSocket, SHUT_RDWR) < 0) logError(LOG_QUERY, "Could not shutdown server socket!");
if(close(this->serverSocket) < 0) logError(LOG_QUERY, "Could not close server socket!");
if(this->server_reserve_fd > 0) {
if(close(this->server_reserve_fd) < 0)
logError(LOG_QUERY, "Failed to close backup file descriptor ({} | {})", errno, strerror(errno));
}
this->serverSocket = -1;
this->server_reserve_fd = -1;
}
void QueryServer::onClientAccept(int fd, short ev, void *arg) {
sockaddr_in remoteAddr{};
memset(&remoteAddr, 0, sizeof(sockaddr_in));
socklen_t addrLength = sizeof(remoteAddr);
inline std::string logging_address(const sockaddr_storage& address) {
if(config::server::disable_ip_saving)
return "X.X.X.X" + to_string(net::port(address));
return net::to_string(address, true);
}
int acceptedSocketFd = accept(serverSocket, (struct sockaddr *) &remoteAddr, &addrLength);
if (acceptedSocketFd < 0) {
if(errno == EAGAIN) { //No manager
inline void send_direct_disconnect(const sockaddr_storage& address, int file_descriptor, const char* message, size_t message_length) {
auto _non_block = [&]{
int flags = fcntl(file_descriptor, F_GETFL, 0);
if (flags == -1) {
debugMessage(LOG_QUERY, "[{}] Failed to set socket to nonblock. Flag query failed ({} | {})", logging_address(address), errno, strerror(errno));
return;
}
flags &= ~O_NONBLOCK;
if(fcntl(file_descriptor, F_SETFL, flags) == -1) {
debugMessage(LOG_QUERY, "[{}] Failed to set socket to nonblock. Flag apply failed ({} | {})", logging_address(address), errno, strerror(errno));
return;
}
};
_non_block();
{
struct timeval timeout{};
timeout.tv_sec = 5;
timeout.tv_usec = 0;
if (setsockopt (file_descriptor, SOL_SOCKET, SO_SNDTIMEO, (char *)&timeout, sizeof(timeout)) < 0)
debugMessage(LOG_QUERY, "[{}] Failed to set the send timeout on socket", logging_address(address));
}
bool broken_pipe = false;
auto _send = [&](const char* data, size_t length) {
if(broken_pipe)
return;
size_t written_bytes = 0;
while(written_bytes < length) {
auto result = send(file_descriptor, data + written_bytes, length - written_bytes, MSG_NOSIGNAL);
if(result <= 0) {
broken_pipe |= errno == EPIPE;
debugMessage(LOG_QUERY, "[{}] Failed to send a message of length {}. Bytes written: {}, error: {} | {}", logging_address(address), length, written_bytes, errno, strerror(errno));
return;
} else {
written_bytes += result;
}
}
};
/* we could ignore errors here */
_send(config::query::motd.data(), config::query::motd.size());
_send(message, message_length);
/* "flush" with the last new line and then close */
int flag = 1;
if(setsockopt(file_descriptor, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(int)) < 0) {
debugMessage(LOG_QUERY, "[{}] Failed to enabled TCP no delay to flush the direct query disconnect socket ({} | {}).", logging_address(address), errno, strerror(errno));
}
_send(config::query::newlineCharacter.data(), config::query::newlineCharacter.size());
if(shutdown(file_descriptor, SHUT_RDWR) < 0) {
debugMessage(LOG_QUERY, "[{}] Failed to shutdown socket ({} | {}).", logging_address(address), errno, strerror(errno));
}
if(close(file_descriptor) < 0) {
debugMessage(LOG_QUERY, "[{}] Failed to close socket ({} | {}).", logging_address(address), errno, strerror(errno));
}
}
//dummyfdflood
//dummyfdflood clear
void QueryServer::on_client_receive(int _server_file_descriptor, short ev, void *arg) {
sockaddr_storage remote_address{};
memset(&remote_address, 0, sizeof(sockaddr_in));
socklen_t address_length = sizeof(remote_address);
int file_descriptor = accept(_server_file_descriptor, (struct sockaddr *) &remote_address, &address_length);
if (file_descriptor < 0) {
if(errno == EAGAIN)
return;
if(errno == EMFILE || errno == ENFILE) {
if(errno == EMFILE)
logError(LOG_QUERY, "Server ran out file descriptors. Please increase the process file descriptor limit or decrease the instance variable 'serverinstance_query_max_connections'");
else
logError(LOG_QUERY, "Server ran out file descriptors. Please increase the process and system-wide file descriptor limit or decrease the instance variable 'serverinstance_query_max_connections'");
bool tmp_close_success = false;
{
lock_guard reserve_fd_lock(server_reserve_fd_lock);
if(this->server_reserve_fd > 0) {
debugMessage(LOG_QUERY, "Trying to accept client with the reserved file descriptor to send him a protocol limit reached exception.");
auto _ = [&]{
if(close(this->server_reserve_fd) < 0) {
debugMessage(LOG_QUERY, "Failed to close reserved file descriptor");
tmp_close_success = false;
return;
}
this->server_reserve_fd = 0;
errno = 0;
file_descriptor = accept(_server_file_descriptor, (struct sockaddr *) &remote_address, &address_length);
if(file_descriptor < 0) {
if(errno == EMFILE || errno == ENFILE)
debugMessage(LOG_QUERY, "[{}] Even with freeing the reserved descriptor accept failed. Attempting to reclaim reserved file descriptor", logging_address(remote_address));
else if(errno == EAGAIN);
else {
debugMessage(LOG_QUERY, "[{}] Failed to accept client with reserved file descriptor. ({} | {})", logging_address(remote_address), errno, strerror(errno));
}
this->server_reserve_fd = dup(1);
if(this->server_reserve_fd < 0)
debugMessage(LOG_QUERY, "[{}] Failed to reclaim reserved file descriptor. Future clients cant be accepted!", logging_address(remote_address));
else
tmp_close_success = true;
return;
}
debugMessage(LOG_QUERY, "[{}] Successfully accepted client via reserved descriptor (fd: {}). Initializing socket and sending MOTD and disconnect.", logging_address(remote_address), file_descriptor);
static auto resource_limit_error = R"(error id=57344 msg=query\sserver\sresource\slimit\sreached extra_msg=file\sdescriptor\slimit\sexceeded)";
send_direct_disconnect(remote_address, file_descriptor, resource_limit_error, strlen(resource_limit_error));
this->server_reserve_fd = dup(1);
if(this->server_reserve_fd < 0)
debugMessage(LOG_QUERY, "Failed to reclaim reserved file descriptor. Future clients cant be accepted!");
else
tmp_close_success = true;
logMessage(LOG_QUERY, "[{}] Dropping new query connection attempt because of too many open file descriptors.", logging_address(remote_address));
};
_();
}
}
if(!tmp_close_success) {
debugMessage(LOG_QUERY, "Sleeping two seconds because we're currently having no resources for this user. (Removing the accept event)");
for(auto& binding : this->bindings)
event_del_noblock(binding->event_accept);
accept_event_deleted = system_clock::now();
return;
}
return;
}
logMessage(LOG_QUERY, "Got an error while accepting a new client. (errno: {}, message: {})", errno, strerror(errno));
return;
}
{
unique_lock lock(this->connected_clients_lock);
auto max_connections = serverInstance->properties()[property::SERVERINSTANCE_QUERY_MAX_CONNECTIONS].as<size_t>();
if(max_connections > 0 && max_connections <= this->connectedClients.size()) {
lock.unlock();
logMessage(LOG_QUERY, "[{}] Dropping new query connection attempt because of too many connected query clients.", logging_address(remote_address));
static auto query_server_full = R"(error id=4611 msg=max\sclients\sreached)";
send_direct_disconnect(remote_address, file_descriptor, query_server_full, strlen(query_server_full));
return;
}
shared_ptr<QueryClient> client = std::make_shared<QueryClient>(this, acceptedSocketFd);
auto max_ip_connections = serverInstance->properties()[property::SERVERINSTANCE_QUERY_MAX_CONNECTIONS_PER_IP].as<size_t>();
if(max_ip_connections > 0) {
size_t connection_count = 0;
for(auto& client : this->connectedClients) {
if(net::address_equal(client->remote_address, remote_address))
connection_count++;
}
if(connection_count >= max_ip_connections) {
lock.unlock();
logMessage(LOG_QUERY, "[{}] Dropping new query connection attempt because of too many simultaneously connected session from this ip.", logging_address(remote_address));
static auto query_server_full = R"(error id=4610 msg=too\smany\ssimultaneously\sconnected\ssessions)";//
send_direct_disconnect(remote_address, file_descriptor, query_server_full, strlen(query_server_full));
return;
}
}
}
shared_ptr<QueryClient> client = std::make_shared<QueryClient>(this, file_descriptor);
client->applySelfLock(client);
memcpy(&client->remote_address, &remoteAddr, sizeof(sockaddr_in));
memcpy(&client->remote_address, &remote_address, sizeof(remote_address));
this->clientLock.lock();
this->connectedClients.push_back(client);
this->clientLock.unlock();
{
lock_guard lock(this->connected_clients_lock);
this->connectedClients.push_back(client);
}
client->preInitialize();
if(client->readEvent)
event_add(client->readEvent, nullptr);
logMessage(LOG_QUERY, "Got new client from {}", client->getLoggingPeerIp() + ":" + to_string(client->getPeerPort()));
if(client->readEvent) {
event_add(client->readEvent, nullptr);
}
logMessage(LOG_QUERY, "Got new client from {}", client->getLoggingPeerIp() + ":" + to_string(client->getPeerPort()));
}
/*
@ -351,7 +555,7 @@ void QueryServer::tick() {
decltype(this->connectedClients) clCopy;
{
threads::MutexLock lock(this->clientLock);
lock_guard lock(this->connected_clients_lock);
clCopy = this->connectedClients;
}
for(const auto& cl : clCopy) cl->queryTick();
@ -380,4 +584,10 @@ void QueryServer::tick() {
this->loginAttempts.erase(ip);
}
}
if(this->accept_event_deleted.time_since_epoch().count() != 0 && accept_event_deleted + seconds(5) < system_clock::now()) {
debugMessage(LOG_QUERY, "Readding accept event and try again if we have enough resources again.");
for(auto& binding : this->bindings)
event_add(binding->event_accept, nullptr);
accept_event_deleted = system_clock::time_point{};
}
}

View File

@ -12,6 +12,7 @@
#include <sql/SqlQuery.h>
#include "../Group.h"
#include <event.h>
#include <misc/net.h>
#include "../manager/IpListManager.h"
namespace ts {
@ -50,13 +51,20 @@ namespace ts {
class QueryServer {
friend class QueryClient;
public:
struct Binding {
sockaddr_storage address{};
int file_descriptor = 0;
::event* event_accept = nullptr;
inline std::string as_string() { return net::to_string(address, true); }
};
explicit QueryServer(sql::SqlManager*);
~QueryServer();
bool start(const sockaddr_in&, std::string&);
bool start(const std::deque<std::shared_ptr<Binding>>& /* bindings */, std::string& /* error */);
void stop();
bool running(){ return active; }
sockaddr_in* boundedAddress(){ return boundAddress; }
void unregisterConnection(const std::shared_ptr<QueryClient> &);
@ -83,22 +91,24 @@ namespace ts {
threads::ThreadPool* executePool() { return this->_executePool; }
private:
sql::SqlManager* sql;
sockaddr_in* boundAddress = nullptr;
bool active = false;
int serverSocket;
std::deque<std::shared_ptr<Binding>> bindings;
std::vector<threads::Thread*> threads;
std::mutex server_reserve_fd_lock;
int server_reserve_fd = -1; /* -1 = unset | 0 = in use | > 0 ready to use */
std::unique_ptr<IpListManager> ip_whitelist;
std::unique_ptr<IpListManager> ip_blacklist;
//IO stuff
event_base* eventLoop = nullptr;
::event* acceptEvent = nullptr;
std::chrono::system_clock::time_point accept_event_deleted;
threads::ThreadPool* _executePool = nullptr;
threads::Mutex clientLock;
std::mutex connected_clients_lock;
std::deque<std::shared_ptr<QueryClient>> connectedClients;
threads::Mutex loginLock;
@ -107,8 +117,8 @@ namespace ts {
std::map<std::string, std::chrono::system_clock::time_point> queryBann;
threads::Thread* ioThread = nullptr;
threads::SchedulingTask tickingId = 0;
void onClientAccept(int fd, short ev, void *arg);
threads::SchedulingTask tickingId = nullptr;
void on_client_receive(int fd, short ev, void *arg);
void tick();
};
}

View File

@ -63,7 +63,7 @@ std::shared_ptr<file::FileEntry> FileServer::findFile(std::string path, std::sha
fs::path absPath = fs::u8path(path);
if(!fs::is_regular_file(absPath) && !fs::is_directory(absPath)){
debugMessage(lstream << "Could not find requested file. Abs path: " << absPath << "|" << path << ". (path=" << path << ", parent=" << (parent ? parent->path + "/" + parent->name : "./") << ")");
debugMessage(LOG_FT, "Could not find requested file. Abs path: {} | {}. (path={}, parent={})", absPath.string(), path, path, (parent ? parent->path + "/" + parent->name : "./"));
return nullptr;
}
@ -113,7 +113,7 @@ std::vector<std::shared_ptr<file::FileEntry>> FileServer::listFiles(std::shared_
entry->lastChanged = fs::last_write_time(elm.path());
result.push_back(entry);
} else {
logError("Invalid file in file tree. File path: " + elm.path().string());
logError(LOG_FT, "Invalid file in file tree. File path: " + elm.path().string());
}
}
@ -207,7 +207,7 @@ std::shared_ptr<file::FileTransfereKey> FileServer::generateUploadTransferKey(st
threads::MutexLock lock(this->keylock);
pendingKeys.push_back(result);
}
debugMessage("Created file upload key=" + result->key + " for " + targetFile + " (" + to_string(size) + " bytes)");
debugMessage(LOG_FT, "Created file upload key=" + result->key + " for " + targetFile + " (" + to_string(size) + " bytes)");
return result;
}
@ -216,8 +216,10 @@ std::shared_ptr<file::Directory> FileServer::resolveDirectory(const shared_ptr<T
if(!findFile(path))
this->createDirectory(path.string(), nullptr);
path += subPath;
logMessage(lstream << "resolve " << path.string() << " -> " << findFile(path.string()) << " -> " << typeid(findFile(path.string())).name() << endl);
return static_pointer_cast<file::Directory>(findFile(path.string()));
auto ffile = findFile(path.string());
debugMessage(LOG_FT, "Resolve {} => {} -> {}", path.string(), (void*) ffile.get(), typeid(ffile).name());
return static_pointer_cast<file::Directory>(ffile);
}
std::shared_ptr<file::Directory> FileServer::iconDirectory(const shared_ptr<TSServer> &server) {
@ -268,56 +270,90 @@ void FileServer::deleteServer(const shared_ptr<TSServer> &server) {
if(fs::exists(path)) {
error_code error;
if(fs::remove_all(path, error) == 0)
logError(0, "Could not delete server directory {} ({} | {})", path.string(), error.value(), error.message());
logError(LOG_FT, "Could not delete server directory {} ({} | {})", path.string(), error.value(), error.message());
} else {
logError(0, "Could not delete missing server directory (" + path.string() + ")");
logError(LOG_FT, "Could not delete missing server directory (" + path.string() + ")");
}
}
//The actual server!
bool FileServer::start(const sockaddr_in& localAdress) {
if(this->running()) return false;
bool FileServer::start(const std::deque<std::shared_ptr<FileServer::Binding>>& bindings, std::string& error) {
if(this->running()) {
error = "server already running";
return false;
}
this->active = true;
boundAddress = new sockaddr_in;
memcpy(boundAddress, &localAdress, sizeof(localAdress));
this->serverSocket = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);
if (serverSocket < 0) {
logCritical(LOG_FT, "Cant create server socket for file server");
return false;
}
int enable = 1;
int disabled = 0;
if (setsockopt(serverSocket, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(int)) < 0) {
logError(LOG_FT, "setsockopt(SO_REUSEADDR) failed");
}
if(setsockopt(serverSocket, IPPROTO_TCP, TCP_NOPUSH, &disabled, sizeof disabled) < 0) {
logError(LOG_FT, lstream << "Cant disable nopush! Error: "+to_string(errno)+" / "+strerror(errno) << endl);
}
if(fcntl(serverSocket, F_SETFD, FD_CLOEXEC) < 0) {
logError(LOG_QUERY, "Failed to enable FD_CLOEXEC for {} (QueryServer)", serverSocket);
/* reserve backup file descriptor in case that the max file descriptors have been reached */
{
this->server_reserve_fd = dup(1);
if(this->server_reserve_fd < 0)
logWarning(LOG_FT, "Failed to reserve a backup accept file descriptor. ({} | {})", errno, strerror(errno));
}
if (bind(serverSocket, (struct sockaddr *) &localAdress, sizeof(localAdress)) < 0) {
logError(LOG_FT, lstream << "Cant bind server socket (" << strerror(errno) << ")" << endl);
return false;
}
if(listen(serverSocket, 255) < 0){
logError(LOG_FT, lstream << "Cant listen on server socket (" << strerror(errno) << ")" << endl);
return false;
}
/* setup event bases */
{
this->ioLoop = event_base_new();
this->ioThread = new threads::Thread(THREAD_SAVE_OPERATIONS | THREAD_EXECUTE_LATER, [&]{
while(this->active) {
debugMessage(LOG_FT, "Entering event loop ({})", (void*) this->ioLoop);
event_base_loop(this->ioLoop, EVLOOP_NO_EXIT_ON_EMPTY);
if(this->active) {
debugMessage(LOG_FT, "Event loop exited ({}). No active events. Sleeping 1 seconds", (void*) this->ioLoop);
this_thread::sleep_for(seconds(1));
} else {
debugMessage(LOG_FT, "Event loop exited ({})", (void*) this->ioLoop);
}
}
});
this->ioThread->name("File IO #1").execute();
}
ioLoop = event_base_new();
this->acceptEvent = event_new(this->ioLoop, this->serverSocket, EV_READ | EV_PERSIST, [](int a, short b, void* c){ ((FileServer*) c)->onClientAccept(a, b, c); }, this);
event_add(this->acceptEvent, nullptr);
{
for(auto& binding : bindings) {
binding->file_descriptor = socket(binding->address.ss_family, SOCK_STREAM | SOCK_NONBLOCK, 0);
if(binding->file_descriptor < 0) {
logError(LOG_FT, "Failed to bind server to {}. (Failed to create socket: {} | {})", binding->as_string(), errno, strerror(errno));
continue;
}
this->ioThread = new threads::Thread(THREAD_SAVE_OPERATIONS | THREAD_EXECUTE_LATER, [&](){
event_base_dispatch(this->ioLoop);
debugMessage(LOG_FT, "File-Server accept thread terminated");
});
this->ioThread->name("File IO #1").execute();
int enable = 1, disabled = 0;
if (setsockopt(binding->file_descriptor, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(int)) < 0)
logWarning(LOG_FT, "Failed to activate SO_REUSEADDR for binding {} ({} | {})", binding->as_string(), errno, strerror(errno));
if(setsockopt(binding->file_descriptor, IPPROTO_TCP, TCP_NOPUSH, &disabled, sizeof disabled) < 0)
logWarning(LOG_FT, "Failed to deactivate TCP_NOPUSH for binding {} ({} | {})", binding->as_string(), errno, strerror(errno));
if(binding->address.ss_family == AF_INET6) {
if(setsockopt(binding->file_descriptor, IPPROTO_IPV6, IPV6_V6ONLY, &enable, sizeof(int)) < 0)
logWarning(LOG_FT, "Failed to activate IPV6_V6ONLY for IPv6 binding {} ({} | {})", binding->as_string(), errno, strerror(errno));
}
if(fcntl(binding->file_descriptor, F_SETFD, FD_CLOEXEC) < 0)
logWarning(LOG_FT, "Failed to set flag FD_CLOEXEC for binding {} ({} | {})", binding->as_string(), errno, strerror(errno));
if (bind(binding->file_descriptor, (struct sockaddr *) &binding->address, sizeof(binding->address)) < 0) {
logError(LOG_FT, "Failed to bind server to {}. (Failed to bind socket: {} | {})", binding->as_string(), errno, strerror(errno));
close(binding->file_descriptor);
continue;
}
if (listen(binding->file_descriptor, SOMAXCONN) < 0) {
logError(LOG_FT, "Failed to bind server to {}. (Failed to listen: {} | {})", binding->as_string(), errno, strerror(errno));
close(binding->file_descriptor);
continue;
}
binding->event_accept = event_new(this->ioLoop, binding->file_descriptor, EV_READ | EV_PERSIST, [](int a, short b, void* c){ ((FileServer *) c)->on_client_accept(a, b, c); }, this);
event_add(binding->event_accept, nullptr);
this->bindings.push_back(binding);
}
if(this->bindings.empty()) {
this->stop();
error = "failed to bind to any address";
return false;
}
}
for(int index = 0; index < 2; index++){
auto th = new threads::Thread(THREAD_SAVE_OPERATIONS | THREAD_EXECUTE_LATER, &FileServer::clientTickingExecutor, this);
@ -335,16 +371,26 @@ void FileServer::stop() {
this->tickingCon.notify_all();
}
if(this->acceptEvent) {
event_del(this->acceptEvent);
event_free(this->acceptEvent);
this->acceptEvent = nullptr;
}
for(auto& binding : this->bindings) {
if(binding->event_accept) {
event_del_block(binding->event_accept);
event_free(binding->event_accept);
binding->event_accept = nullptr;
}
if(binding->file_descriptor > 0) {
if(shutdown(binding->file_descriptor, SHUT_RDWR) < 0)
logWarning(LOG_FT, "Failed to shutdown socket for binding {} ({} | {}).", binding->as_string(), errno, strerror(errno));
if(close(binding->file_descriptor) < 0)
logError(LOG_FT, "Failed to close socket for binding {} ({} | {}).", binding->as_string(), errno, strerror(errno));
binding->file_descriptor = -1;
}
}
this->bindings.clear();
auto clClone = this->connectedClients;
for(const auto& cl : clClone) {
cl->disconnect(chrono::milliseconds(1));
}
auto clClone = this->connectedClients;
for(const auto& cl : clClone) {
cl->disconnect(chrono::milliseconds(1));
}
if(this->ioLoop) {
event_base_loopbreak(this->ioLoop);
@ -370,40 +416,135 @@ void FileServer::stop() {
this->ioLoop = nullptr;
}
delete this->boundAddress;
this->boundAddress = nullptr;
if(this->serverSocket > 0){
if(shutdown(this->serverSocket, SHUT_RDWR) < 0) logError(LOG_FT, "Could not shutdown server socket!");
if(close(this->serverSocket) < 0) logError(LOG_FT, "Could not close server socket!");
}
this->serverSocket = 0;
if(this->server_reserve_fd > 0) {
if(close(this->server_reserve_fd) < 0)
logError(LOG_FT, "Failed to close backup file descriptor ({} | {})", errno, strerror(errno));
}
this->server_reserve_fd = -1;
}
inline std::string logging_address(const sockaddr_storage& address) {
if(config::server::disable_ip_saving)
return "[0|X.X.X.X" + to_string(net::port(address)) + "|unconnected]";
return "[0|X.X.X.X" + net::to_string(address, true) + "|unconnected]";
}
void FileServer::onClientAccept(int fd, short ev, void *arg) {
sockaddr_in remoteAddr{};
memset(&remoteAddr, 0, sizeof(sockaddr_in));
socklen_t addrLength = sizeof(remoteAddr);
#define CLOSE_CONNECTION \
if(shutdown(file_descriptor, SHUT_RDWR) < 0) { \
debugMessage(LOG_FT, "[{}] Failed to shutdown socket ({} | {}).", logging_address(remote_address), errno, strerror(errno)); \
} \
if(close(file_descriptor) < 0) { \
debugMessage(LOG_FT, "[{}] Failed to close socket ({} | {}).", logging_address(remote_address), errno, strerror(errno)); \
}
int acceptedSocketFd = accept(serverSocket, (struct sockaddr *) &remoteAddr, &addrLength);
if (acceptedSocketFd < 0) {
if(errno == EAGAIN){ //No manager
return;
}
logError(LOG_FT, "Having an error while accepting a new client. ({}/{})", errno, strerror(errno));
return;
}
void FileServer::on_client_accept(int _server_file_descriptor, short ev, void *arg) {
sockaddr_storage remote_address{};
memset(&remote_address, 0, sizeof(remote_address));
socklen_t address_length = sizeof(remote_address);
shared_ptr<FileClient> client = std::make_shared<FileClient>(this, acceptedSocketFd);
int file_descriptor = accept(_server_file_descriptor, (struct sockaddr *) &remote_address, &address_length);
if (file_descriptor < 0) {
if(errno == EAGAIN)
return;
if(errno == EMFILE || errno == ENFILE) {
if(errno == EMFILE)
logError(LOG_FT, "Server ran out file descriptors. Please increase the process file descriptor limit or decrease the instance variable 'serverinstance_filetransfer_max_connections'");
else
logError(LOG_FT, "Server ran out file descriptors. Please increase the process and system-wide file descriptor limit or decrease the instance variable 'serverinstance_filetransfer_max_connections'");
bool tmp_close_success = false;
{
lock_guard reserve_fd_lock(server_reserve_fd_lock);
if(this->server_reserve_fd > 0) {
debugMessage(LOG_FT, "Trying to accept client with the reserved file descriptor to close the incomming connection.");
auto _ = [&]{
if(close(this->server_reserve_fd) < 0) {
debugMessage(LOG_FT, "Failed to close reserved file descriptor");
tmp_close_success = false;
return;
}
this->server_reserve_fd = 0;
errno = 0;
file_descriptor = accept(_server_file_descriptor, (struct sockaddr *) &remote_address, &address_length);
if(file_descriptor < 0) {
if(errno == EMFILE || errno == ENFILE)
debugMessage(LOG_FT, "[{}] Even with freeing the reserved descriptor accept failed. Attempting to reclaim reserved file descriptor", logging_address(remote_address));
else if(errno == EAGAIN);
else {
debugMessage(LOG_FT, "[{}] Failed to accept client with reserved file descriptor. ({} | {})", logging_address(remote_address), errno, strerror(errno));
}
this->server_reserve_fd = dup(1);
if(this->server_reserve_fd < 0)
debugMessage(LOG_FT, "[{}] Failed to reclaim reserved file descriptor. Future clients cant be accepted!", logging_address(remote_address));
else
tmp_close_success = true;
return;
}
debugMessage(LOG_FT, "[{}] Successfully accepted client via reserved descriptor (fd: {}). Disconnecting client.", logging_address(remote_address), file_descriptor);
CLOSE_CONNECTION
this->server_reserve_fd = dup(1);
if(this->server_reserve_fd < 0)
debugMessage(LOG_FT, "Failed to reclaim reserved file descriptor. Future clients cant be accepted!");
else
tmp_close_success = true;
logMessage(LOG_FT, "[{}] Dropping file transfer connection attempt because of too many open file descriptors.", logging_address(remote_address));
};
_();
}
}
if(!tmp_close_success) {
debugMessage(LOG_FT, "Sleeping two seconds because we're currently having no resources for this user. (Removing the accept event)");
for(auto& binding : this->bindings)
event_del_noblock(binding->event_accept);
accept_event_deleted = system_clock::now();
return;
}
return;
}
logMessage(LOG_FT, "Got an error while accepting a new client. (errno: {}, message: {})", errno, strerror(errno));
return;
}
{
unique_lock lock(this->clientLock);
auto max_connections = serverInstance->properties()[property::SERVERINSTANCE_FILETRANSFER_MAX_CONNECTIONS].as<size_t>();
if(max_connections > 0 && max_connections <= this->connectedClients.size()) {
lock.unlock();
logMessage(LOG_FT, "[{}] Dropping new connection attempt because of too many connected clients.", logging_address(remote_address));
CLOSE_CONNECTION
return;
}
auto max_ip_connections = serverInstance->properties()[property::SERVERINSTANCE_FILETRANSFER_MAX_CONNECTIONS_PER_IP].as<size_t>();
if(max_ip_connections > 0) {
size_t connection_count = 0;
for(auto& client : this->connectedClients) {
if(net::address_equal(client->remote_address, remote_address))
connection_count++;
}
if(connection_count >= max_ip_connections) {
lock.unlock();
logMessage(LOG_FT, "[{}] Dropping new connection attempt because of too many simultaneously connected session from this ip.", logging_address(remote_address));
CLOSE_CONNECTION
return;
}
}
}
shared_ptr<FileClient> client = std::make_shared<FileClient>(this, file_descriptor);
client->_this = client;
memcpy(&client->remoteAddress, &remoteAddr, sizeof(sockaddr_in));
memcpy(&client->remote_address, &remote_address, sizeof(remote_address));
this->clientLock.lock();
this->connectedClients.push_back(client);
this->clientLock.unlock();
event_add(client->readEvent, nullptr);
logMessage(LOG_FT, "Got new client from {}", config::server::disable_ip_saving ? "X.X.X.X" : string(inet_ntoa(client->remoteAddress.sin_addr)) + ":" + to_string(ntohs(client->remoteAddress.sin_port)));
logMessage(LOG_FT, "[{}] Remote peer connected. Initializing session.", logging_address(remote_address));
}
void FileServer::clientTickingExecutor() {
@ -468,6 +609,12 @@ void FileServer::instanceTick() {
this->tickQueue.insert(this->tickQueue.end(), client.begin(), client.end()); //Tick all clients :)
this->tickingCon.notify_all();
}
if(this->accept_event_deleted.time_since_epoch().count() != 0 && accept_event_deleted + seconds(5) < system_clock::now()) {
debugMessage(LOG_FT, "Readding accept event and try again if we have enough resources again.");
for(auto& binding : this->bindings)
event_add(binding->event_accept, nullptr);
accept_event_deleted = system_clock::time_point{};
}
auto now = system_clock::now();
if(timestamp_bandwidth_update + seconds(1) < now) {

View File

@ -12,6 +12,7 @@
#include <condition_variable>
#include "Variable.h"
#include <Definitions.h>
#include <misc/net.h>
namespace ts {
namespace file {
@ -78,15 +79,23 @@ namespace ts {
class FileServer {
friend class FileClient;
public:
struct Binding {
sockaddr_storage address{};
int file_descriptor = 0;
::event* event_accept = nullptr;
inline std::string as_string() { return net::to_string(address, true); }
};
FileServer();
~FileServer();
bool start(const sockaddr_in&);
bool start(const std::deque<std::shared_ptr<Binding>>& /* bindings */, std::string& /* error */);
void stop();
bool running(){ return active; }
sockaddr_in* boundedAddress(){ return boundAddress; }
ts_always_inline bool running(){ return active; }
ts_always_inline std::deque<std::shared_ptr<Binding>> list_bindings() { return this->bindings; }
std::shared_ptr<file::Directory> createDirectory(std::string name, std::shared_ptr<file::Directory> parent);
std::shared_ptr<file::Directory> createDirectory(std::string name, std::shared_ptr<file::Directory> parent);
bool fileExists(std::shared_ptr<file::Directory>);
bool fileExists(std::shared_ptr<file::File>);
std::shared_ptr<file::FileEntry> findFile(std::string, std::shared_ptr<file::Directory> = nullptr);
@ -112,15 +121,16 @@ namespace ts {
std::deque<std::shared_ptr<FileClient>> running_file_transfers(const std::shared_ptr<ConnectedClient> & /* client */ = nullptr);
std::deque<std::shared_ptr<file::FileTransfereKey>> pending_file_transfers(const std::shared_ptr<ConnectedClient> & /* client */ = nullptr);
private:
sockaddr_in* boundAddress = nullptr;;
bool active = false;
int serverSocket;
std::deque<std::shared_ptr<Binding>> bindings;
std::string rootPath = "./files/";
//IO stuff
event_base* ioLoop = nullptr;
::event* acceptEvent = nullptr;
std::chrono::system_clock::time_point accept_event_deleted;
std::mutex server_reserve_fd_lock;
int server_reserve_fd = -1; /* -1 = unset | 0 = in use | > 0 ready to use */
threads::Mutex clientLock;
std::deque<std::shared_ptr<FileClient>> connectedClients;
@ -130,7 +140,7 @@ namespace ts {
}
threads::Thread* ioThread = nullptr;
void onClientAccept(int fd, short ev, void *arg);
void on_client_accept(int fd, short ev, void *arg);
std::deque<std::shared_ptr<FileClient>> tickQueue;
std::deque<threads::Thread*> tickingThreads;

View File

@ -9,6 +9,7 @@
#include <misc/time.h>
#include <misc/memtracker.h>
#include <sql/sqlite/SqliteSQL.h>
#include <sys/resource.h>
#include "CommandHandler.h"
#include "src/server/QueryServer.h"
@ -65,6 +66,8 @@ namespace terminal {
handleCommandPermGrant(cmd);
else if(cmd.lcommand == "dummycrash" || cmd.lcommand == "dummy_crash")
handleCommandDummyCrash(cmd);
else if(cmd.lcommand == "dummyfdflood" || cmd.lcommand == "dummy_fdflood")
handleCommandDummyFdFlood(cmd);
else if(cmd.lcommand == "meminfo")
handleCommandMemInfo(cmd);
else if(cmd.lcommand == "spoken")
@ -457,5 +460,38 @@ namespace terminal {
logMessage("Monthly statistics will be reset");
return true;
}
deque<int> fd_leaks;
bool handleCommandDummyFdFlood(TerminalCommand& cmd) {
size_t value;
if(cmd.arguments.size() < 1) {
value = 1024;
rlimit limit{1024, 10000};
setrlimit(7, &limit);
} else if(cmd.larguments[0] == "clear") {
logMessage("Clearup leaks");
for(auto& fd : fd_leaks)
close(fd);
fd_leaks.clear();
return;
} else {
value = cmd.arguments[0].as<size_t>();
}
logMessage("Leaking {} file descriptors", value);
size_t index = 0;
while(index < value) {
auto fd = dup(1);
if(fd < 0)
logMessage("Failed to create a file descriptor {} | {}", errno, strerror(errno));
else
fd_leaks.push_back(fd);
index++;
}
return true;
}
}
}

View File

@ -18,6 +18,7 @@ namespace terminal {
extern void handleCommand(std::string);
extern bool handleCommandDummyCrash(TerminalCommand&);
extern bool handleCommandDummyFdFlood(TerminalCommand&);
extern bool handleCommandHelp(TerminalCommand&);
extern bool handleCommandEnd(TerminalCommand&);

2
shared

@ -1 +1 @@
Subproject commit d9ddc2c06d7731b14cae47f67a3414ce47e34bae
Subproject commit a0cca36eca11da410626a340dbe4377067d59c1b