Using one global event loop for the query and web client

This commit is contained in:
WolverinDEV 2021-04-15 12:54:52 +02:00
parent 6e2e005ed7
commit abeeae4ed5
30 changed files with 529 additions and 551 deletions

View File

@ -76,6 +76,7 @@ set(SERVER_SOURCE_FILES
src/client/query/QueryClientNotify.cpp
src/manager/IpListManager.cpp
src/server/GlobalNetworkEvents.cpp
src/ConnectionStatistics.cpp
@ -116,7 +117,6 @@ set(SERVER_SOURCE_FILES
src/client/query/XMacroEventTypes.h
src/server/VoiceIOManager.cpp
src/server/WebIoManager.cpp
src/client/SpeakingClient.cpp
../shared/src/ssl/SSLManager.cpp

View File

@ -122,6 +122,7 @@ std::string config::messages::timeout::connection_reinitialized;
size_t config::threads::ticking;
size_t config::threads::command_execute;
size_t config::threads::network_events;
size_t config::threads::voice::events_per_server;
size_t config::threads::voice::io_min;
size_t config::threads::voice::io_per_server;
@ -1835,6 +1836,12 @@ std::deque<std::shared_ptr<EntryBinding>> config::create_bindings() {
ADD_DESCRIPTION("Command executors");
ADD_SENSITIVE();
}
{
CREATE_BINDING("network_events", 0);
BIND_INTEGRAL(config::threads::network_events, 4, 1, 128);
ADD_DESCRIPTION("Network event loops");
ADD_SENSITIVE();
}
{
BIND_GROUP(voice)
{

View File

@ -231,6 +231,7 @@ namespace ts::config {
namespace threads {
extern size_t ticking;
extern size_t command_execute;
extern size_t network_events;
namespace voice {
extern size_t events_per_server;

View File

@ -9,6 +9,7 @@
#include "src/server/QueryServer.h"
#include "src/manager/PermissionNameMapper.h"
#include "./FileServerHandler.h"
#include "./server/GlobalNetworkEvents.h"
#include <ThreadPool/Timer.h>
#include "ShutdownHelper.h"
#include <sys/utsname.h>
@ -270,6 +271,13 @@ bool InstanceHandler::startInstance() {
string errorMessage;
this->server_command_executor_ = std::make_shared<ServerCommandExecutor>(ts::config::threads::command_execute);
this->network_event_loop_ = std::make_unique<NetworkEventLoop>(ts::config::threads::network_events);
if(!this->network_event_loop_->initialize()) {
this->server_command_executor_ = nullptr;
this->network_event_loop_ = nullptr;
logCritical(LOG_INSTANCE, "Failed to initialize network event loop");
return false;
}
this->permission_mapper = make_shared<permission::PermissionNameMapper>();
if(!this->permission_mapper->initialize(config::permission_mapping_file, errorMessage)) {
@ -376,7 +384,6 @@ FwIDAQAB
logCritical(LOG_GENERAL, "Failed to initialize WebClient TeaForum key! ({})", error);
return false;
}
this->web_event_loop = make_shared<webio::LoopManager>();
}
#endif
@ -467,10 +474,11 @@ void InstanceHandler::stopInstance() {
delete this->sslMgr;
this->sslMgr = nullptr;
this->web_event_loop = nullptr;
this->license_service_->shutdown();
this->server_command_executor_ = nullptr;
this->network_event_loop_->shutdown();
this->network_event_loop_ = nullptr;
}
void InstanceHandler::tickInstance() {

View File

@ -7,7 +7,6 @@
#include <src/lincense/LicenseService.h>
#include "manager/SqlDataManager.h"
#include "lincense/TeamSpeakLicense.h"
#include "server/WebIoManager.h"
#include <misc/task_executor.h>
namespace ts {
@ -32,6 +31,7 @@ namespace ts {
class GroupManager;
}
class NetworkEventLoop;
class ServerCommandExecutor;
class InstanceHandler;
@ -80,13 +80,13 @@ namespace ts {
bool resetMonthlyStats();
[[nodiscard]] inline const auto& general_task_executor(){ return this->general_task_executor_; }
[[nodiscard]] inline const auto& network_event_loop(){ return this->network_event_loop_; }
[[nodiscard]] inline std::shared_ptr<stats::ConnectionStatistics> getStatistics(){ return statistics; }
[[nodiscard]] std::shared_ptr<license::InstanceLicenseInfo> generateLicenseData();
[[nodiscard]] inline std::shared_ptr<TeamSpeakLicense> getTeamSpeakLicense() { return this->teamspeak_license; }
[[nodiscard]] inline PropertyWrapper getDefaultServerProperties() { return PropertyWrapper{this->default_server_properties}; }
[[nodiscard]] inline std::shared_ptr<webio::LoopManager> getWebIoLoop() { return this->web_event_loop; }
[[nodiscard]] inline std::shared_ptr<permission::PermissionNameMapper> getPermissionMapper() { return this->permission_mapper; }
[[nodiscard]] inline std::shared_ptr<ts::event::EventExecutor> getConversationIo() { return this->conversation_io; }
@ -118,13 +118,13 @@ namespace ts {
ssl::SSLManager* sslMgr = nullptr;
file::FileServerHandler* file_server_handler_{nullptr};
std::unique_ptr<log::ActionLogger> action_logger_{nullptr};
std::unique_ptr<NetworkEventLoop> network_event_loop_{nullptr};
std::shared_ptr<ts::PropertyManager> _properties{};
std::shared_ptr<ServerCommandExecutor> server_command_executor_{};
std::shared_ptr<ts::event::EventExecutor> conversation_io = nullptr;
std::shared_ptr<webio::LoopManager> web_event_loop = nullptr;
std::shared_ptr<ts::PropertyManager> default_server_properties = nullptr;
std::shared_mutex default_tree_lock;

View File

@ -22,100 +22,120 @@ bool VirtualServer::registerClient(shared_ptr<ConnectedClient> client) {
sassert(client);
{
lock_guard lock(this->clients.lock);
std::lock_guard clients_lock{this->clients_mutex};
if(client->getClientId() > 0) {
logCritical(this->getServerId(), "Client {} ({}|{}) has been already registered!", client->getDisplayName(), client->getClientId(), client->getUid());
return false;
}
ClientId client_id = 0;
ClientId max_client_id = this->clients.clients.size();
while(client_id < max_client_id && this->clients.clients[client_id])
ClientId client_id{0};
while(this->clients.count(client_id)) {
client_id++;
if(client_id == max_client_id)
this->clients.clients.push_back(client);
else
this->clients.clients[client_id] = client;
this->clients.count++;
}
this->clients.emplace(client_id, client);
client->setClientId(client_id);
}
{
lock_guard lock(this->client_nickname_lock);
std::lock_guard lock{this->client_nickname_lock};
auto login_name = client->getDisplayName();
while(login_name.length() < 3)
login_name += ".";
if(client->getExternalType() == ClientType::CLIENT_TEAMSPEAK)
if(client->getExternalType() == ClientType::CLIENT_TEAMSPEAK) {
client->properties()[property::CLIENT_LOGIN_NAME] = login_name;
}
std::shared_ptr<ConnectedClient> found_client = nullptr;
while(login_name.length() < 3) {
login_name += ".";
}
auto client_name = login_name;
size_t counter = 0;
std::shared_ptr<ConnectedClient> found_client{nullptr};
auto registered_clients = this->getClients();
auto client_name{login_name};
size_t counter{0};
{
lock_guard clients_lock(this->clients.lock);
while(true) {
for(auto& _client : this->clients.clients) {
if(!_client) continue;
if(_client->getDisplayName() == client_name && _client != client)
for(auto& _client : registered_clients) {
if(_client->getDisplayName() == client_name && _client != client) {
goto increase_name;
}
}
goto nickname_valid;
increase_name:
client_name = login_name + to_string(++counter);
client_name = login_name + std::to_string(++counter);
}
}
nickname_valid:
client->setDisplayName(client_name);
}
switch(client->getType()) {
case ClientType::CLIENT_TEAMSPEAK:
case ClientType::CLIENT_TEASPEAK:
case ClientType::CLIENT_WEB:
this->properties()[property::VIRTUALSERVER_CLIENT_CONNECTIONS].increment_by<uint64_t>(1); //increase manager connections
this->properties()[property::VIRTUALSERVER_LAST_CLIENT_CONNECT] = duration_cast<seconds>(system_clock::now().time_since_epoch()).count();
break;
case ClientType::CLIENT_QUERY:
this->properties()[property::VIRTUALSERVER_LAST_QUERY_CONNECT] = duration_cast<seconds>(system_clock::now().time_since_epoch()).count();
this->properties()[property::VIRTUALSERVER_QUERY_CLIENT_CONNECTIONS].increment_by<uint64_t>(1); //increase manager connections
break;
case ClientType::CLIENT_MUSIC:
case ClientType::CLIENT_INTERNAL:
break;
if(client->getType() == ClientType::CLIENT_TEAMSPEAK || client->getType() == ClientType::CLIENT_WEB) {
this->properties()[property::VIRTUALSERVER_CLIENT_CONNECTIONS].increment_by<uint64_t>(1); //increase manager connections
this->properties()[property::VIRTUALSERVER_LAST_CLIENT_CONNECT] = duration_cast<seconds>(system_clock::now().time_since_epoch()).count();
}
else if(client->getType() == ClientType::CLIENT_QUERY) {
this->properties()[property::VIRTUALSERVER_LAST_QUERY_CONNECT] = duration_cast<seconds>(system_clock::now().time_since_epoch()).count();
this->properties()[property::VIRTUALSERVER_QUERY_CLIENT_CONNECTIONS].increment_by<uint64_t>(1); //increase manager connections
case ClientType::MAX:
default:
assert(false);
break;
}
return true;
}
bool VirtualServer::unregisterClient(shared_ptr<ConnectedClient> cl, std::string reason, std::unique_lock<std::shared_mutex>& chan_tree_lock) {
if(cl->getType() == ClientType::CLIENT_TEAMSPEAK || cl->getType() == ClientType::CLIENT_TEASPEAK || cl->getType() == ClientType::CLIENT_WEB) {
sassert(cl->state == ConnectionState::DISCONNECTED);
bool VirtualServer::unregisterClient(shared_ptr<ConnectedClient> client, std::string reason, std::unique_lock<std::shared_mutex>& chan_tree_lock) {
if(client->getType() == ClientType::CLIENT_TEAMSPEAK || client->getType() == ClientType::CLIENT_TEASPEAK || client->getType() == ClientType::CLIENT_WEB) {
sassert(client->state == ConnectionState::DISCONNECTED);
}
auto client_id = cl->getClientId();
if(client_id == 0) {
return false; /* not registered */
}
{
lock_guard lock(this->clients.lock);
if(client_id >= this->clients.clients.size()) {
logCritical(this->getServerId(), "Client {} ({}|{}) has been registered, but client id exceed client id! Failed to unregister client.", cl->getDisplayName(), client_id, cl->getUid());
} else {
auto& client_container = this->clients.clients[client_id];
if(client_container != cl) {
logCritical(this->getServerId(), "Client {} ({}|{}) has been registered, but container hasn't client set! Failed to unregister client.", cl->getDisplayName(), client_id, cl->getUid());
} else {
client_container.reset();
this->clients.count--;
}
if(!chan_tree_lock.owns_lock()) {
chan_tree_lock.lock();
}
if(client->currentChannel) {
//We dont have to make him invisible if he hasnt even a channel
this->client_move(client, nullptr, nullptr, reason, ViewReasonId::VREASON_SERVER_LEFT, false, chan_tree_lock);
}
chan_tree_lock.unlock();
}
{
std::lock_guard clients_lock{this->clients_mutex};
auto client_id = client->getClientId();
if(client_id == 0) {
return false; /* not registered */
}
if(!this->clients.erase(client_id)) {
client->setClientId(0);
logError(this->getServerId(), "Tried to unregister a not registered client {}/{} ({})", client->getDisplayName(), client->getUid(), client_id);
return false;
}
client->setClientId(0);
}
auto current_time_seconds = std::chrono::duration_cast<seconds>(std::chrono::system_clock::now().time_since_epoch()).count();
switch(cl->getType()) {
switch(client->getType()) {
case ClientType::CLIENT_TEAMSPEAK:
case ClientType::CLIENT_TEASPEAK:
case ClientType::CLIENT_WEB:
@ -134,65 +154,20 @@ bool VirtualServer::unregisterClient(shared_ptr<ConnectedClient> cl, std::string
break;
}
{
if(!chan_tree_lock.owns_lock()) {
chan_tree_lock.lock();
}
if(cl->currentChannel) {
//We dont have to make him invisible if he hasnt even a channel
this->client_move(cl, nullptr, nullptr, reason, ViewReasonId::VREASON_SERVER_LEFT, false, chan_tree_lock);
}
}
serverInstance->databaseHelper()->saveClientPermissions(this->ref(), cl->getClientDatabaseId(), cl->clientPermissions);
cl->setClientId(0);
serverInstance->databaseHelper()->saveClientPermissions(this->ref(), client->getClientDatabaseId(), client->clientPermissions);
return true;
}
void VirtualServer::registerInternalClient(std::shared_ptr<ConnectedClient> client) {
client->state = ConnectionState::CONNECTED;
{
lock_guard lock(this->clients.lock);
if(client->getClientId() > 0) {
logCritical(this->getServerId(), "Internal client {} ({}|{}) has been already registered!", client->getDisplayName(), client->getClientId(), client->getUid());
return;
}
ClientId client_id = 0;
ClientId max_client_id = this->clients.clients.size();
while(client_id < max_client_id && this->clients.clients[client_id])
client_id++;
if(client_id == max_client_id)
this->clients.clients.push_back(client);
else
this->clients.clients[client_id] = client;
this->clients.clients[client_id] = client;
this->clients.count++;
client->setClientId(client_id);
}
this->registerClient(client);
}
void VirtualServer::unregisterInternalClient(std::shared_ptr<ConnectedClient> client) {
client->state = ConnectionState::DISCONNECTED;
{
auto client_id = client->getClientId();
lock_guard lock(this->clients.lock);
if(client_id >= this->clients.clients.size()) {
logCritical(this->getServerId(), "Client {} ({}|{}) has been registered, but client id exceed client id! Failed to unregister internal client.", client->getDisplayName(), client_id, client->getUid());
} else {
auto& client_container = this->clients.clients[client_id];
if(client_container != client) {
logCritical(this->getServerId(), "Client {} ({}|{}) has been registered, but container hasn't client set! Failed to unregister internal client.", client->getDisplayName(), client_id, client->getUid());
} else {
this->clients.count--;
client_container.reset();
}
}
}
std::unique_lock tree_lock{this->channel_tree_mutex};
this->unregisterClient(client, "internal disconnect", tree_lock);
}
bool VirtualServer::assignDefaultChannel(const shared_ptr<ConnectedClient>& client, bool join) {

View File

@ -126,18 +126,9 @@ void VirtualServer::executeServerTick() {
tick_client_begin = tick_client_end;
if(cl->server != this) {
logError(this->getServerId(), "Got registered client, but client does not think hes bound to this server!");
{
lock_guard lock(this->clients.lock);
for(auto& client : this->clients.clients) {
if(client != cl) continue;
client.reset();
this->clients.count--;
break;
}
}
continue; //Fully ha?
std::unique_lock tree_lock{this->channel_tree_mutex};
this->unregisterClient(cl, "invalid server handle", tree_lock);
continue;
}
if(cl->floodPoints > flood_block){

View File

@ -458,17 +458,6 @@ bool VirtualServer::start(std::string& error) {
this->serverRoot->server = self.lock();
this->serverAdmin->server = self.lock();
{ //Client delete after server stop/start
lock_guard lock(this->clients.lock);
for(auto& client : this->clients.clients) {
if(!client) continue;
if(client->getType() == ClientType::CLIENT_WEB || client->getType() == ClientType::CLIENT_TEAMSPEAK) {
client.reset();
}
}
}
auto host = this->properties()[property::VIRTUALSERVER_HOST].value();
if(config::binding::enforce_default_voice_host)
host = config::binding::DefaultVoiceHost;
@ -691,45 +680,39 @@ void VirtualServer::stop(const std::string& reason, bool disconnect_query) {
}
size_t VirtualServer::onlineClients() {
size_t result = 0;
size_t result{0};
lock_guard lock(this->clients.lock);
for(const auto &cl : this->clients.clients) {
if(!cl)
continue;
if(cl->getType() == CLIENT_TEAMSPEAK || cl->getType() == CLIENT_QUERY)
for(const auto& client : this->getClients()) {
if(client->getType() == CLIENT_TEAMSPEAK || client->getType() == CLIENT_QUERY) {
result++;
}
}
return result;
}
OnlineClientReport VirtualServer::onlineStats() {
OnlineClientReport response{};
{
lock_guard lock(this->clients.lock);
for(const auto &cl : this->clients.clients) {
if(!cl) continue;
switch (cl->getType()) {
case CLIENT_TEAMSPEAK:
case CLIENT_TEASPEAK:
response.clients_ts++;
break;
case CLIENT_WEB:
response.clients_web++;
break;
case CLIENT_QUERY:
response.queries++;
break;
case CLIENT_MUSIC:
response.bots++;
break;
case CLIENT_INTERNAL:
case MAX:
default:
break;
}
for(const auto &client : this->getClients()) {
switch (client->getType()) {
case CLIENT_TEAMSPEAK:
case CLIENT_TEASPEAK:
response.clients_ts++;
break;
case CLIENT_WEB:
response.clients_web++;
break;
case CLIENT_QUERY:
response.queries++;
break;
case CLIENT_MUSIC:
response.bots++;
break;
case CLIENT_INTERNAL:
case MAX:
default:
break;
}
}
@ -737,37 +720,38 @@ OnlineClientReport VirtualServer::onlineStats() {
}
std::shared_ptr<ConnectedClient> VirtualServer::find_client_by_id(uint16_t client_id) {
lock_guard lock(this->clients.lock);
if(this->clients.clients.size() > client_id)
return this->clients.clients[client_id];
else
std::lock_guard lock{this->clients_mutex};
auto it = this->clients.find(client_id);
if(it == this->clients.end()) {
return nullptr;
} else {
return it->second;
}
}
deque<shared_ptr<ConnectedClient>> VirtualServer::findClientsByCldbId(uint64_t cldbId) {
deque<shared_ptr<ConnectedClient>> result;
std::deque<shared_ptr<ConnectedClient>> result;
lock_guard lock(this->clients.lock);
for(const auto &client : this->clients.clients) {
if(!client) continue;
if(client->getClientDatabaseId() == cldbId)
std::lock_guard lock{this->clients_mutex};
for(const auto& [_, client] : this->clients) {
if(client->getClientDatabaseId() == cldbId) {
result.push_back(client);
}
}
return result;
}
deque<shared_ptr<ConnectedClient>> VirtualServer::findClientsByUid(std::string uid) {
lock_guard lock(this->clients.lock);
deque<shared_ptr<ConnectedClient>> result;
for(const auto &client : this->clients.clients) {
if(!client) continue;
std::deque<shared_ptr<ConnectedClient>> result;
std::lock_guard lock{this->clients_mutex};
for(const auto& [_, client] : this->clients) {
if(client->getUid() == uid) {
result.push_back(client);
}
}
return result;
}
@ -776,22 +760,18 @@ std::shared_ptr<ConnectedClient> VirtualServer::findClient(std::string name, boo
std::transform(name.begin(), name.end(), name.begin(), ::tolower);
}
{
std::lock_guard lock{this->clients_mutex};
for(const auto& [_, client] : this->clients) {
string clName = client->getDisplayName();
if(ignoreCase) {
std::transform(clName.begin(), clName.end(), clName.begin(), ::tolower);
}
lock_guard lock(this->clients.lock);
for(const auto& client : this->clients.clients) {
if(!client) continue;
string clName = client->getDisplayName();
if(ignoreCase) {
std::transform(clName.begin(), clName.end(), clName.begin(), ::tolower);
}
if(clName == name)
return client;
if(clName == name) {
return client;
}
}
return nullptr;
}
@ -809,19 +789,16 @@ bool VirtualServer::forEachClient(std::function<void(std::shared_ptr<ConnectedCl
}
std::vector<std::shared_ptr<ConnectedClient>> VirtualServer::getClients() {
vector<shared_ptr<ConnectedClient>> clients;
std::vector<shared_ptr<ConnectedClient>> result{};
{
lock_guard lock(this->clients.lock);
clients.reserve(this->clients.count);
std::lock_guard lock{this->clients_mutex};
result.reserve(this->clients.size());
for(auto& client : this->clients.clients) {
if(!client) continue;
clients.push_back(client);
}
for(const auto& [_, client] : this->clients) {
result.push_back(client);
}
return clients;
return result;
}
/* Note: This method **should** not lock the channel tree else we've a lot to do! */

View File

@ -21,6 +21,7 @@
#include "manager/LetterManager.h"
#include "Configuration.h"
#include "protocol/ringbuffer.h"
#include "absl/btree/map.h"
#include <misc/task_executor.h>
#include <tomcrypt.h>
@ -325,11 +326,8 @@ namespace ts {
std::chrono::system_clock::time_point conversation_cache_cleanup_timestamp;
//The client list
struct {
size_t count = 0;
std::mutex lock;
std::vector<std::shared_ptr<ConnectedClient>> clients;
} clients;
std::mutex clients_mutex{};
btree::map<ClientId, std::shared_ptr<ConnectedClient>> clients{};
std::recursive_mutex client_nickname_lock;

View File

@ -901,7 +901,10 @@ private:
// allocator_type& alloc = allocator();
// allocator_traits::destroy(alloc, v);
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wclass-memaccess"
assert(memcpy(v, zero_value, sizeof(value_type)));
#pragma GCC diagnostic pop
}
void destroy_value(int i) {

View File

@ -119,7 +119,8 @@ command_result SpeakingClient::applyClientInitParameters(Command &cmd) {
this->properties()[property::CLIENT_NICKNAME_PHONETIC] = name;
} else if(key == "client_version" || key == "client_platform") {
auto value = cmd[key].string();
if(value.length() > 64) {
if(value.length() > 512) {
/* The web client uses the full browser string which might be a bit longer */
return command_result{error::client_hacked};
}

View File

@ -3353,10 +3353,10 @@ enum struct FeatureSupportMode {
DEPRECATED
};
#define REGISTER_FEATURE(name, support, version) \
notify.put_unchecked(index, "name", name); \
notify.put_unchecked(index, "support", (int) support); \
notify.put_unchecked(index, "version", version); \
#define REGISTER_FEATURE(name, support, version) \
notify.put_unchecked(index, "name", name); \
notify.put_unchecked(index, "support", (int) support); \
notify.put_unchecked(index, "version", version); \
index++
command_result ConnectedClient::handleCommandListFeatureSupport(ts::Command &cmd) {

View File

@ -7,6 +7,7 @@
#include <misc/std_unique_ptr.h>
#include <log/LogUtils.h>
#include "../../groups/GroupAssignmentManager.h"
#include "../../server/GlobalNetworkEvents.h"
using namespace std;
using namespace std::chrono;
@ -66,8 +67,8 @@ void QueryClient::initialize_weak_reference(const std::shared_ptr<ConnectedClien
std::make_unique<QueryClientCommandHandler>(dynamic_pointer_cast<QueryClient>(self))
);
this->event_read = event_new(this->handle->event_io_loop, this->client_file_descriptor, EV_READ | EV_PERSIST, QueryClient::handle_event_read, this);
this->event_write = event_new(this->handle->event_io_loop, this->client_file_descriptor, EV_WRITE, QueryClient::handle_event_write, this);
this->event_read = serverInstance->network_event_loop()->allocate_event(this->client_file_descriptor, EV_READ | EV_PERSIST, QueryClient::handle_event_read, this, nullptr);
this->event_write = serverInstance->network_event_loop()->allocate_event(this->client_file_descriptor, EV_WRITE, QueryClient::handle_event_write, this, nullptr);
}
QueryClient::~QueryClient() {

View File

@ -72,10 +72,12 @@ command_result VoiceClient::handleCommand(ts::Command &command) {
if(!this->voice_server) return command_result{error::server_unbound};
if(this->state == ConnectionState::INIT_HIGH && this->handshake.state == HandshakeState::SUCCEEDED) {
if(command.command() == "clientinit")
if(command.command() == "clientinit") {
return this->handleCommandClientInit(command);
} else if(command.command() == "clientdisconnect")
}
} else if(command.command() == "clientdisconnect") {
return this->handleCommandClientDisconnect(command);
}
return SpeakingClient::handleCommand(command);
}

View File

@ -1,4 +1,3 @@
#include <algorithm>
#include <log/LogUtils.h>
#include <misc/memtracker.h>
#include <protocol/Packet.h>
@ -6,7 +5,6 @@
#include "../../server/VoiceServer.h"
#include "./VoiceClientConnection.h"
#include "./VoiceClient.h"
//#define LOG_AUTO_ACK_AUTORESPONSE
@ -76,16 +74,11 @@ void VoiceClientConnection::triggerWrite() {
}
}
void VoiceClientConnection::handle_incoming_datagram(const pipes::buffer_view& buffer) {
ClientPacketParser packet_parser{buffer};
if(!packet_parser.valid()) {
return;
}
void VoiceClientConnection::handle_incoming_datagram(protocol::ClientPacketParser& packet_parser) {
#ifndef CONNECTION_NO_STATISTICS
if(this->current_client) {
auto stats = this->current_client->connectionStatistics;
stats->logIncomingPacket(stats::ConnectionStatistics::category::from_type(packet_parser.type()), buffer.length() + 96); /* 96 for the UDP packet overhead */
stats->logIncomingPacket(stats::ConnectionStatistics::category::from_type(packet_parser.type()), packet_parser.buffer().length() + 96); /* 96 for the UDP packet overhead */
}
this->packet_statistics().received_packet((protocol::PacketType) packet_parser.type(), packet_parser.full_packet_id());
#endif
@ -197,8 +190,8 @@ void VoiceClientConnection::callback_command_decoded(void *ptr_this, Reassembled
connection->handlePacketCommand(std::exchange(command, nullptr));
}
bool VoiceClientConnection::verify_encryption(const pipes::buffer_view &buffer /* incl. mac etc */) {
return this->packet_decoder_.verify_encryption_client_packet(buffer);
bool VoiceClientConnection::verify_encryption(const protocol::ClientPacketParser& packet) {
return this->packet_decoder_.verify_encryption_client_packet(packet);
}
std::shared_ptr<ts::server::VoiceClient> VoiceClientConnection::getCurrentClient() {

View File

@ -81,8 +81,8 @@ namespace ts {
[[nodiscard]] inline auto& ping_handler() { return this->ping_handler_; }
[[nodiscard]] inline auto& crypt_setup_handler() { return this->crypt_setup_handler_; }
protected:
void handle_incoming_datagram(const pipes::buffer_view &buffer);
bool verify_encryption(const pipes::buffer_view& /* full packet */);
void handle_incoming_datagram(protocol::ClientPacketParser& /* packet */);
bool verify_encryption(const protocol::ClientPacketParser& /* packet */);
void triggerWrite();
private:

View File

@ -10,59 +10,61 @@ using namespace ts;
using namespace ts::server;
using namespace ts::protocol;
void WebClient::handleMessageWrite(int fd, short, void *) {
auto self_lock = this->ref();
void WebClient::handleMessageWrite(int fd, short, void *ptr_client) {
auto client = dynamic_pointer_cast<WebClient>(((WebClient*) ptr_client)->ref());
assert(client);
unique_lock buffer_lock(this->queue_mutex);
if(this->queue_write.empty()) return;
unique_lock buffer_lock(client->queue_mutex);
if(client->queue_write.empty()) return;
auto buffer = this->queue_write[0];
this->queue_write.pop_front();
auto buffer = client->queue_write[0];
client->queue_write.pop_front();
auto written = send(fd, buffer.data_ptr(), buffer.length(), MSG_NOSIGNAL | MSG_DONTWAIT);
if(written == -1) {
buffer_lock.unlock();
if (errno == EINTR || errno == EAGAIN) {
lock_guard event_lock(this->event_mutex);
if(this->writeEvent)
event_add(this->writeEvent, nullptr);
lock_guard event_lock(client->event_mutex);
if(client->writeEvent)
event_add(client->writeEvent, nullptr);
return;
} else {
//new ServerConnection(globalClient).startConnection({ host: "localhost", port: 9987}, new HandshakeHandler(profiles.default_profile(), "test"))
{
std::lock_guard event_lock{this->event_mutex};
if(this->writeEvent) {
event_del_noblock(this->writeEvent);
event_free(this->writeEvent);
this->writeEvent = nullptr;
std::lock_guard event_lock{client->event_mutex};
if(client->writeEvent) {
event_del_noblock(client->writeEvent);
event_free(client->writeEvent);
client->writeEvent = nullptr;
}
}
debugMessage(this->getServerId(), "[{}] Failed to write message (length {}, errno {}, message {}) Disconnecting client.", CLIENT_STR_LOG_PREFIX, written, errno, strerror(errno));
debugMessage(client->getServerId(), "[{}] Failed to write message (length {}, errno {}, message {}) Disconnecting client.", client->getLoggingPrefix(), written, errno, strerror(errno));
}
this->close_connection(system_clock::now()); /* close connection in a new thread */
client->close_connection(system_clock::now()); /* close connection in a new thread */
return;
}
if(written < buffer.length()) {
buffer = buffer.range((size_t) written); /* get the overhead */
this->queue_write.push_front(buffer);
client->queue_write.push_front(buffer);
}
if(this->queue_write.empty())
if(client->queue_write.empty())
return;
/* reschedule new write */
buffer_lock.unlock();
lock_guard event_lock(this->event_mutex);
if(this->writeEvent)
event_add(this->writeEvent, nullptr);
lock_guard event_lock(client->event_mutex);
if(client->writeEvent)
event_add(client->writeEvent, nullptr);
}
void WebClient::handleMessageRead(int fd, short, void *) {
auto self_lock = this->ref();
void WebClient::handleMessageRead(int fd, short, void *ptr_client) {
auto client = dynamic_pointer_cast<WebClient>(((WebClient*) ptr_client)->ref());
assert(client);
size_t buffer_length = 1024 * 4;
uint8_t buffer[buffer_length];
@ -72,14 +74,14 @@ void WebClient::handleMessageRead(int fd, short, void *) {
if(errno == EINTR || errno == EAGAIN)
;
else {
debugMessage(this->getServerId(), "[{}] Failed to read message (length {}, errno {}, message: {}). Closing connection.", CLIENT_STR_LOG_PREFIX, length, errno, strerror(errno));
debugMessage(client->getServerId(), "[{}] Failed to read message (length {}, errno {}, message: {}). Closing connection.", client->getLoggingPrefix(), length, errno, strerror(errno));
{
lock_guard lock(this->event_mutex);
if(this->readEvent)
event_del_noblock(this->readEvent);
lock_guard lock(client->event_mutex);
if(client->readEvent)
event_del_noblock(client->readEvent);
}
self_lock->close_connection(system_clock::now()); /* direct close, but from another thread */
client->close_connection(system_clock::now()); /* direct close, but from another thread */
}
return;
}
@ -87,7 +89,7 @@ void WebClient::handleMessageRead(int fd, short, void *) {
auto command = command::ReassembledCommand::allocate((size_t) length);
memcpy(command->command(), buffer, (size_t) length);
this->command_queue->enqueue_command_execution(command);
client->command_queue->enqueue_command_execution(command);
}
void WebClient::enqueue_raw_packet(const pipes::buffer_view &msg) {

View File

@ -12,6 +12,7 @@
#include <misc/std_unique_ptr.h>
#include <src/client/SpeakingClient.h>
#include "../../manager/ActionLogger.h"
#include "../../server/GlobalNetworkEvents.h"
#if defined(TCP_CORK) && !defined(TCP_NOPUSH)
#define TCP_NOPUSH TCP_CORK
@ -43,14 +44,12 @@ void WebClient::initialize() {
int enabled = 1;
int disabled = 0;
setsockopt(this->file_descriptor, SOL_SOCKET, SO_KEEPALIVE, &enabled, sizeof(enabled));
if(setsockopt(this->file_descriptor, IPPROTO_TCP, TCP_NOPUSH, &disabled, sizeof disabled) < 0)
if(setsockopt(this->file_descriptor, IPPROTO_TCP, TCP_NOPUSH, &disabled, sizeof disabled) < 0) {
logError(this->getServerId(), "{} Cant disable nopush! system error: {} => {}", CLIENT_STR_LOG_PREFIX, errno, strerror(errno));
}
auto event_loop = serverInstance->getWebIoLoop()->next_loop();
this->readEvent = event_new(event_loop->loop, this->file_descriptor, EV_READ|EV_PERSIST, [](int a, short b, void* c){ ((WebClient*) c)->handleMessageRead(a, b, c); }, this);
this->writeEvent = event_new(event_loop->loop, this->file_descriptor, EV_WRITE, [](int a, short b, void* c){ ((WebClient*) c)->handleMessageWrite(a, b, c); }, this);
this->readEvent = serverInstance->network_event_loop()->allocate_event(this->file_descriptor, EV_READ | EV_PERSIST, WebClient::handleMessageRead, this, nullptr);
this->writeEvent = serverInstance->network_event_loop()->allocate_event(this->file_descriptor, EV_WRITE, WebClient::handleMessageWrite, this, nullptr);
{
this->ws_handler.direct_process(pipes::PROCESS_DIRECTION_IN, true);
@ -281,8 +280,9 @@ command_result WebClient::handleCommand(Command &command) {
if(this->connectionState() == ConnectionState::INIT_HIGH && this->handshake.state == HandshakeState::SUCCEEDED){
if(command.command() == "clientinit") {
auto result = this->handleCommandClientInit(command);
if(result.has_error())
if(result.has_error()) {
this->close_connection(system_clock::now() + seconds(1));
}
return result;
}
}
@ -400,10 +400,11 @@ void WebClient::disconnectFinal() {
lock_guard lock(this->execute_mutex);
}
if(this->flush_thread.get_id() == this_thread::get_id())
if(this->flush_thread.get_id() == this_thread::get_id()) {
this->flush_thread.detach();
else
} else {
assert(!this->flush_thread.joinable()); /* shall be already joined via closeConnection(...)*/
}
{
::event *event_read, *event_write;
@ -434,6 +435,7 @@ void WebClient::disconnectFinal() {
this->file_descriptor = -1;
}
this->state = ConnectionState::DISCONNECTED;
this->processLeave();
/* We do not finalize here since we might still try to send some data */
@ -551,8 +553,9 @@ bool WebClient::disconnect(const std::string &reason) {
}
command_result WebClient::handleCommandClientInit(Command &command) {
if(!config::server::clients::teaweb)
if(!config::server::clients::teaweb) {
return command_result{error::client_type_is_not_allowed, config::server::clients::teaweb_not_allowed_message};
}
return SpeakingClient::handleCommandClientInit(command);
}

View File

@ -83,8 +83,8 @@ namespace ts::server {
private:
void initialize();
void handleMessageRead(int, short, void*);
void handleMessageWrite(int, short, void*);
static void handleMessageRead(int, short, void*);
static void handleMessageWrite(int, short, void*);
void enqueue_raw_packet(const pipes::buffer_view& /* buffer */);
/* TODO: Put the message processing part into the IO loop and not into command processing! */

View File

@ -0,0 +1,119 @@
//
// Created by WolverinDEV on 15/04/2021.
//
#include "GlobalNetworkEvents.h"
#include <log/LogUtils.h>
#include <misc/threads.h>
using namespace ts::server;
namespace ts::server {
struct NetworkEventLoopUseList {
std::vector<NetworkEventLoop::EventLoopId> used_event_loops{};
};
}
NetworkEventLoop::NetworkEventLoop(size_t event_loop_size) : event_loop_size{event_loop_size} { }
NetworkEventLoop::~NetworkEventLoop() {
this->shutdown();
}
bool NetworkEventLoop::initialize() {
std::lock_guard lock{this->mutex};
while(this->event_loops.size() < this->event_loop_size) {
auto event_loop = new EventLoop{this->event_loop_id_index++};
event_loop->event_base = event_base_new();
if(!event_loop->event_base) {
logError(LOG_GENERAL, "Failed to allocate new event base.");
delete event_loop;
return false;
}
event_loop->dispatcher = std::thread{NetworkEventLoop::event_loop_dispatch, event_loop};
threads::name(event_loop->dispatcher, "network loop #" + std::to_string(event_loop->loop_id));
this->event_loops.push_back(event_loop);
}
return true;
}
void NetworkEventLoop::shutdown() {
std::unique_lock lock{this->mutex};
auto event_loops_ = std::move(this->event_loops);
lock.unlock();
for(const auto& loop : event_loops_) {
event_base_loopexit(loop->event_base, nullptr);
}
for(const auto& loop : event_loops_) {
if(!threads::timed_join(loop->dispatcher, std::chrono::seconds{15})) {
/* This will cause a memory corruption since the memory we're freeing will still be accessed */
logCritical(LOG_GENERAL, "Failed to join event loop {}. Detaching thread.", loop->loop_id);
loop->dispatcher.detach();
}
event_base_free(loop->event_base);
delete loop;
}
}
void NetworkEventLoop::free_use_list(NetworkEventLoopUseList *list) {
delete list;
}
event* NetworkEventLoop::allocate_event(int fd, short events, event_callback_fn callback, void *callback_data, NetworkEventLoopUseList **use_list) {
if(use_list && !*use_list) {
*use_list = new NetworkEventLoopUseList{};
}
std::lock_guard lock{this->mutex};
EventLoop* event_loop;
size_t try_count{0};
while(try_count++ < this->event_loops.size()) {
event_loop = this->event_loops[this->event_loop_index % this->event_loops.size()];
if(!use_list) {
/* we have our event loop */
break;
}
auto& used_loops = (*use_list)->used_event_loops;
if(std::find(used_loops.begin(), used_loops.end(), event_loop->loop_id) == used_loops.end()) {
/* we haven't yet used that event loop */
break;
}
}
if(try_count >= this->event_loops.size()) {
/* We've no event loop to put the event in */
return nullptr;
}
auto event = event_new(event_loop->event_base, fd, events, callback, callback_data);
if(!event) {
/* failed to allocate the new event */
return nullptr;
}
this->event_loop_index++;
if(use_list) {
(*use_list)->used_event_loops.push_back(event_loop->loop_id);
}
return event;
}
void NetworkEventLoop::event_loop_dispatch(EventLoop *event_loop) {
debugMessage(LOG_GENERAL, "Network event loop {} started.", event_loop->loop_id);
auto result = event_base_loop(event_loop->event_base, EVLOOP_NO_EXIT_ON_EMPTY);
if(result < 0) {
logError(LOG_GENERAL, "Network event loop exited due to an error.");
} else if(result == 0) {
debugMessage(LOG_GENERAL, "Network event loop {} exited.", event_loop->loop_id);
} else if(result > 0) {
logError(LOG_GENERAL, "Network event loop exited because of no pending events. This should not happen!");
}
}

View File

@ -0,0 +1,58 @@
//
// Created by WolverinDEV on 15/04/2021.
//
#pragma once
#include <mutex>
#include <thread>
#include <vector>
#include <event.h>
namespace ts::server {
struct NetworkEventLoopUseList;
class NetworkEventLoop {
public:
typedef uint32_t EventLoopId;
explicit NetworkEventLoop(size_t /* thread pool size */);
~NetworkEventLoop();
[[nodiscard]] bool initialize();
void shutdown();
[[nodiscard]] inline size_t loop_count() const { return this->event_loop_size; }
/**
* Allocate a new event on the network event loop.
* @param fd
* @param events
* @param callback
* @param callback_arg
* @return `nullptr` if an error occurred and an even otherwise
*/
[[nodiscard]] struct event* allocate_event(
evutil_socket_t /* fd */,
short /* events */,
event_callback_fn /* callback */,
void */* callback_arg */,
NetworkEventLoopUseList** /* containing all loops the event has already been bound to */
);
void free_use_list(NetworkEventLoopUseList* /* use list */);
private:
struct EventLoop {
const EventLoopId loop_id;
struct event_base* event_base{nullptr};
std::thread dispatcher{};
};
size_t event_loop_size;
std::mutex mutex{};
EventLoopId event_loop_id_index{1};
size_t event_loop_index{0};
std::vector<EventLoop*> event_loops{};
static void event_loop_dispatch(EventLoop*);
};
}

View File

@ -12,6 +12,7 @@
#include <src/InstanceHandler.h>
#include <ThreadPool/ThreadHelper.h>
#include <log/LogUtils.h>
#include "./GlobalNetworkEvents.h"
using namespace std;
using namespace std::chrono;
@ -75,25 +76,6 @@ bool QueryServer::start(const deque<shared_ptr<QueryServer::Binding>> &bindings_
}
}
/* setup event bases */
{
this->event_io_loop = event_base_new();
this->event_io_thread = std::thread{[&]{
while(this->active) {
debugMessage(LOG_QUERY, "Entering event loop ({})", (void*) this->event_io_loop);
event_base_loop(this->event_io_loop, EVLOOP_NO_EXIT_ON_EMPTY);
if(this->active) {
debugMessage(LOG_QUERY, "Event loop exited ({}). No active events. Sleeping 1 seconds", (void*) this->event_io_loop);
this_thread::sleep_for(seconds(1));
} else {
debugMessage(LOG_QUERY, "Event loop exited ({})", (void*) this->event_io_loop);
}
}
}};
threads::name(this->event_io_thread, "query io");
}
for(auto& binding : bindings_) {
binding->file_descriptor = socket(binding->address.ss_family, (unsigned) SOCK_STREAM | (unsigned) SOCK_NONBLOCK, 0);
if(binding->file_descriptor < 0) {
@ -134,7 +116,13 @@ bool QueryServer::start(const deque<shared_ptr<QueryServer::Binding>> &bindings_
continue;
}
binding->event_accept = event_new(this->event_io_loop, binding->file_descriptor, EV_READ | EV_PERSIST, [](int a, short b, void* c){ ((QueryServer *) c)->on_client_receive(a, b, c); }, this);
binding->event_accept = serverInstance->network_event_loop()->allocate_event(binding->file_descriptor, EV_READ | EV_PERSIST, [](int a, short b, void* c){ ((QueryServer *) c)->on_client_receive(a, b, c); }, this, nullptr);
if(!binding->event_accept) {
logError(LOG_QUERY, "Failed to allocate accept event for query binding", binding->as_string());
close(binding->file_descriptor);
continue;
}
event_add(binding->event_accept, nullptr);
this->bindings.push_back(binding);
}
@ -227,17 +215,6 @@ void QueryServer::stop() {
}
}
/* 5. Shutdown the io event loop */
if(this->event_io_loop) {
event_base_loopexit(this->event_io_loop, nullptr);
}
threads::save_join(this->event_io_thread, false);
if(this->event_io_loop) {
event_base_free(this->event_io_loop);
this->event_io_loop = nullptr;
}
/* 6. Cleanup the servers reserve file descriptor */
if(this->server_reserve_fd > 0) {
if(close(this->server_reserve_fd) < 0) {
@ -319,7 +296,9 @@ inline void send_direct_disconnect(const sockaddr_storage& address, int file_des
//dummyfdflood
//dummyfdflood clear
void QueryServer::on_client_receive(int server_file_descriptor, short, void *) {
void QueryServer::on_client_receive(int server_file_descriptor, short, void *ptr_server) {
auto query_server = (QueryServer*) ptr_server;
sockaddr_storage remote_address{};
memset(&remote_address, 0, sizeof(sockaddr_in));
socklen_t address_length = sizeof(remote_address);
@ -339,16 +318,16 @@ void QueryServer::on_client_receive(int server_file_descriptor, short, void *) {
bool tmp_close_success{false};
{
lock_guard reserve_fd_lock(server_reserve_fd_lock);
if(this->server_reserve_fd > 0) {
lock_guard reserve_fd_lock(query_server->server_reserve_fd_lock);
if(query_server->server_reserve_fd > 0) {
debugMessage(LOG_QUERY, "Trying to accept client with the reserved file descriptor to send him a protocol limit reached exception.");
auto _ = [&]{
if(close(this->server_reserve_fd) < 0) {
if(close(query_server->server_reserve_fd) < 0) {
debugMessage(LOG_QUERY, "Failed to close reserved file descriptor");
tmp_close_success = false;
return;
}
this->server_reserve_fd = 0;
query_server->server_reserve_fd = 0;
errno = 0;
client_file_descriptor = accept(server_file_descriptor, (struct sockaddr *) &remote_address, &address_length);
@ -360,8 +339,8 @@ void QueryServer::on_client_receive(int server_file_descriptor, short, void *) {
} else {
debugMessage(LOG_QUERY, "[{}] Failed to accept client with reserved file descriptor. ({} | {})", logging_address(remote_address), errno, strerror(errno));
}
this->server_reserve_fd = dup(1);
if(this->server_reserve_fd < 0) {
query_server->server_reserve_fd = dup(1);
if(query_server->server_reserve_fd < 0) {
debugMessage(LOG_QUERY, "[{}] Failed to reclaim reserved file descriptor. Future clients cant be accepted!", logging_address(remote_address));
} else {
tmp_close_success = true;
@ -373,8 +352,8 @@ void QueryServer::on_client_receive(int server_file_descriptor, short, void *) {
static auto resource_limit_error = R"(error id=57344 msg=query\sserver\sresource\slimit\sreached extra_msg=file\sdescriptor\slimit\sexceeded)";
send_direct_disconnect(remote_address, client_file_descriptor, resource_limit_error, strlen(resource_limit_error));
this->server_reserve_fd = dup(1);
if(this->server_reserve_fd < 0) {
query_server->server_reserve_fd = dup(1);
if(query_server->server_reserve_fd < 0) {
debugMessage(LOG_QUERY, "Failed to reclaim reserved file descriptor. Future clients cant be accepted!");
} else {
tmp_close_success = true;
@ -387,10 +366,10 @@ void QueryServer::on_client_receive(int server_file_descriptor, short, void *) {
if(!tmp_close_success) {
debugMessage(LOG_QUERY, "Sleeping two seconds because we're currently having no resources for this user. (Removing the accept event)");
for(auto& binding : this->bindings) {
for(auto& binding : query_server->bindings) {
event_del_noblock(binding->event_accept);
}
accept_event_deleted = system_clock::now();
query_server->accept_event_deleted = system_clock::now();
return;
}
return;
@ -400,9 +379,9 @@ void QueryServer::on_client_receive(int server_file_descriptor, short, void *) {
}
{
unique_lock lock{this->connected_clients_mutex};
unique_lock lock{query_server->connected_clients_mutex};
auto max_connections = serverInstance->properties()[property::SERVERINSTANCE_QUERY_MAX_CONNECTIONS].as_unchecked<size_t>();
if(max_connections > 0 && max_connections <= this->connected_clients.size()) {
if(max_connections > 0 && max_connections <= query_server->connected_clients.size()) {
lock.unlock();
logMessage(LOG_QUERY, "[{}] Dropping new query connection attempt because of too many connected query clients.", logging_address(remote_address));
static auto query_server_full = R"(error id=4611 msg=max\sclients\sreached)";
@ -413,7 +392,7 @@ void QueryServer::on_client_receive(int server_file_descriptor, short, void *) {
auto max_ip_connections = serverInstance->properties()[property::SERVERINSTANCE_QUERY_MAX_CONNECTIONS_PER_IP].as_unchecked<size_t>();
if(max_ip_connections > 0) {
size_t connection_count = 0;
for(auto& client : this->connected_clients) {
for(auto& client : query_server->connected_clients) {
if(net::address_equal(client->remote_address, remote_address))
connection_count++;
}
@ -428,13 +407,13 @@ void QueryServer::on_client_receive(int server_file_descriptor, short, void *) {
}
}
auto client = std::make_shared<QueryClient>(this, client_file_descriptor);
auto client = std::make_shared<QueryClient>(query_server, client_file_descriptor);
memcpy(&client->remote_address, &remote_address, sizeof(remote_address));
client->initialize_weak_reference(client);
{
lock_guard lock(this->connected_clients_mutex);
this->connected_clients.push_back(client);
lock_guard lock(query_server->connected_clients_mutex);
query_server->connected_clients.push_back(client);
}
client->preInitialize();
@ -444,79 +423,6 @@ void QueryServer::on_client_receive(int server_file_descriptor, short, void *) {
logMessage(LOG_QUERY, "Got new client from {}", client->getLoggingPeerIp() + ":" + to_string(client->getPeerPort()));
}
/*
#define QUERY_PASSWORD_LENGTH 7
std::string QueryServer::resetQueryPassword(const std::shared_ptr<QueryLoginCredentials>& credits, std::string password) {
if(password.empty()) password = rnd_string(QUERY_PASSWORD_LENGTH);
auto password_copy = password;
auto res = sql::command(this->sql, "UPDATE `queries` SET `password` = :password WHERE `uniqueId` = :uid AND `username` = :name", variable{":name", credits->username}, variable{":uid", credits->uniqueId}, variable{":password", password}).execute();
LOG_SQL_CMD(res);
return password_copy; //Analize why I have to copy that shit here?
}
std::string QueryServer::createQueryLogin(const string &name, ClientUid uid, string password) {
bool exists = false;
sql::command(this->sql, "SELECT * FROM `queries` WHERE `username` = :name", variable{":name", name}).query([](bool* flag, int, char**, char**){
*flag = true;
return 0;
}, &exists);
if(exists) return "";
if(password.empty())
password = rnd_string(QUERY_PASSWORD_LENGTH);
sql::command(this->sql, "INSERT INTO `queries` (`username`, `password`, `uniqueId`) VALUES (:name, :password, :uid)", variable{":name", name}, variable{":uid", uid}, variable{":password", password}).execute();
return password;
}
bool QueryServer::renameQueryLogin(ClientUid uid, const string &name) {
bool exists = false;
sql::command(this->sql, "SELECT * FROM `queries` WHERE `username` = :name", variable{":name", name}).query([](bool* flag, int, char**, char**){
*flag = true;
return 0;
}, &exists);
if(!exists) return false;
auto res = sql::command(this->sql, "UPDATE `queries` SET `username` = :name WHERE `uniqueId` = :uid", variable{":name", name}, variable{":uid", uid}).execute();
LOG_SQL_CMD(res);
return res;
}
std::shared_ptr<QueryLoginCredentials> QueryServer::findQueryLoginByName(const string &name) {
std::shared_ptr<QueryLoginCredentials> result = std::make_shared<QueryLoginCredentials>();
sql::command(this->sql, "SELECT * FROM `queries` WHERE `username` = :name", variable{":name", name}).query([](QueryLoginCredentials* res, int length, char** value, char** columns){
for(int index = 0; index < length; index++)
if(strcmp(columns[index], "username") == 0)
res->username = value[index];
else if(strcmp(columns[index], "password") == 0)
res->password = value[index];
else if(strcmp(columns[index], "uniqueId") == 0)
res->uniqueId = value[index];
return 0;
}, result.get());
if(result->username.empty()) return nullptr;
return result;
}
std::shared_ptr<QueryLoginCredentials> QueryServer::findQueryLoginByUid(const string &uid) {
std::shared_ptr<QueryLoginCredentials> result = std::make_shared<QueryLoginCredentials>();
sql::command(this->sql, "SELECT * FROM `queries` WHERE `uniqueId` = :name", variable{":name", uid}).query([](QueryLoginCredentials *res, int length, char **value, char **columns) {
for (int index = 0; index < length; index++)
if (strcmp(columns[index], "username") == 0)
res->username = value[index];
else if (strcmp(columns[index], "password") == 0)
res->password = value[index];
else if (strcmp(columns[index], "uniqueId") == 0)
res->uniqueId = value[index];
return 0;
}, result.get());
if (result->username.empty()) return nullptr;
return result;
}
*/
/* api */
inline deque<shared_ptr<QueryAccount>> query_accounts(sql::command& command) {
deque<shared_ptr<QueryAccount>> result;

View File

@ -84,10 +84,6 @@ namespace ts {
std::unique_ptr<IpListManager> ip_whitelist;
std::unique_ptr<IpListManager> ip_blacklist;
//IO stuff
event_base* event_io_loop{nullptr};
std::thread event_io_thread{};
std::chrono::system_clock::time_point accept_event_deleted;
std::mutex connected_clients_mutex{};
@ -107,7 +103,7 @@ namespace ts {
std::deque<std::weak_ptr<QueryClient>> tick_pending_connection_close{};
std::chrono::system_clock::time_point tick_next_client_timestamp{};
void on_client_receive(int server_file_descriptor, short ev, void *arg);
static void on_client_receive(int, short, void *);
void tick_clients();
void tick_executor();

View File

@ -4,19 +4,12 @@
#include <algorithm>
#include <unistd.h>
#include <fcntl.h>
#include "VoiceServer.h"
#include "../client/voice/VoiceClient.h"
#include "../Configuration.h"
#include <log/LogUtils.h>
#include "src/VirtualServer.h"
#include <misc/endianness.h>
#include "src/VirtualServerManager.h"
#include "../InstanceHandler.h"
#include <ThreadPool/Timer.h>
#include <pipes/buffer.h>
#include <netinet/in.h>
#include <misc/sassert.h>
#include "misc/timer.h"
using namespace std;
using namespace std::chrono;
@ -87,19 +80,20 @@ bool VoiceServer::start(const std::deque<std::shared_ptr<VoiceServerBinding>>& b
bind->file_descriptor = 0;
continue;
}
fcntl(bind->file_descriptor, F_SETFL, fcntl(bind->file_descriptor, F_GETFL, 0) | O_NONBLOCK);
}
{
auto bindings = this->activeBindings();
if(bindings.empty()) {
auto active_bindings = this->activeBindings();
if(active_bindings.empty()) {
error = "Failed to bind any address!";
this->running = false;
return false;
}
string str;
for(auto it = bindings.begin(); it != bindings.end(); it++) {
str += net::to_string((*it)->address) + (it + 1 == bindings.end() ? "" : " | ");
for(auto it = active_bindings.begin(); it != active_bindings.end(); it++) {
str += net::to_string((*it)->address) + (it + 1 == active_bindings.end() ? "" : " | ");
}
logMessage(this->server->getServerId(), "Started server on {}.", str);
}
@ -115,8 +109,8 @@ void VoiceServer::triggerWrite(const std::shared_ptr<VoiceClient>& client) {
return;
}
if(auto io{this->io}; io) {
io->invoke_write(client);
if(auto io_{this->io}; io_) {
io_->invoke_write(client);
}
}
@ -237,25 +231,27 @@ bool VoiceServer::unregisterConnection(std::shared_ptr<VoiceClient> connection)
}
static union {
char literal[8] = {'T', 'S', '3', 'I', 'N', 'I', 'T', '1'};
char literal[8]{'T', 'S', '3', 'I', 'N', 'I', 'T', '1'};
uint64_t integral;
} TS3INIT;
constexpr static auto kRecvBufferSize{1600}; //IPv6 MTU: 1500 | IPv4 MTU: 576
void VoiceServer::handleMessageRead(int fd, short events, void *_event_handle) {
(void) events;
auto event_handle = (io::IOEventLoopEntry*) _event_handle;
auto voice_server = event_handle->voice_server;
auto ts_server = event_handle->server;
size_t raw_read_buffer_length = event_handle->family == AF_INET ? 600 : 1600; //IPv6 MTU: 1500 | IPv4 MTU: 576
uint8_t raw_read_buffer[raw_read_buffer_length]; //Allocate on stack, so we dont need heap here
uint8_t raw_read_buffer[kRecvBufferSize]; //Allocate on stack, so we dont need heap here
ssize_t bytes_read = 0;
pipes::buffer_view read_buffer{raw_read_buffer, raw_read_buffer_length}; /* will not allocate anything, just sets its mode to ptr and thats it :) */
ssize_t bytes_read;
pipes::buffer_view read_buffer{raw_read_buffer, kRecvBufferSize}; /* will not allocate anything, just sets its mode to ptr and that's it :) */
sockaddr_storage remote_address{};
iovec io_vector{};
io_vector.iov_base = (void*) raw_read_buffer;
io_vector.iov_len = raw_read_buffer_length;
io_vector.iov_len = kRecvBufferSize;
char message_headers[0x100];
@ -267,13 +263,19 @@ void VoiceServer::handleMessageRead(int fd, short events, void *_event_handle) {
message.msg_control = message_headers;
message.msg_controllen = 0x100;
auto read_timeout = system_clock::now() + microseconds(2500); /* read 2.5ms long at a time or 'till nothing more is there */
auto read_timeout = system_clock::now() + microseconds{2500}; /* read 2.5ms long at a time or 'till nothing more is there */
while(system_clock::now() <= read_timeout){
message.msg_flags = 0;
bytes_read = recvmsg(fd, &message, 0);
if((message.msg_flags & MSG_TRUNC) > 0) {
logError(ts_server->getServerId(), "Received truncated message from {}", net::to_string(remote_address));
static std::chrono::system_clock::time_point last_error_message{};
auto now = system_clock::now();
if(last_error_message + std::chrono::seconds{5} < now) {
logError(ts_server->getServerId(), "Received truncated message from {}", net::to_string(remote_address));
last_error_message = now;
}
continue;
}
if(bytes_read < 0) {
@ -295,27 +297,36 @@ void VoiceServer::handleMessageRead(int fd, short events, void *_event_handle) {
continue;
}
std::shared_ptr<VoiceClient> client;
if(*(uint64_t*) raw_read_buffer == TS3INIT.integral) {
//Handle ddos protection...
voice_server->pow_handler->handle_datagram(event_handle->socket_id, remote_address, message, read_buffer.view(0, bytes_read));
continue;
}
protocol::ClientPacketParser packet_parser{read_buffer.view(0, bytes_read)};
if(!packet_parser.valid()) {
return;
}
std::shared_ptr<VoiceClient> client{};
{
if(*(uint64_t*) raw_read_buffer == TS3INIT.integral) {
//Handle ddos protection...
voice_server->pow_handler->handle_datagram(event_handle->socket_id, remote_address, message, read_buffer.view(0, bytes_read));
auto client_id = packet_parser.client_id();
if(client_id > 0) {
client = dynamic_pointer_cast<VoiceClient>(voice_server->server->find_client_by_id(client_id));
} else {
auto client_id = (ClientId) be2le16(&raw_read_buffer[10]);
if(client_id > 0) {
client = dynamic_pointer_cast<VoiceClient>(voice_server->server->find_client_by_id(client_id));
} else {
client = voice_server->findClient(&remote_address, true);
}
client = voice_server->findClient(&remote_address, true);
}
}
if(!client)
if(!client) {
continue;
}
if(memcmp(&client->remote_address, &remote_address, sizeof(sockaddr_storage)) != 0) { /* verify the remote address */
if((read_buffer[12] & 0x80) == 0 && client->state == ConnectionState::CONNECTED) { /* only encrypted packets are allowed */
if(client->connection->verify_encryption(read_buffer.view(0, bytes_read))) { /* the ip had changed */
/* only encrypted packets are allowed */
if(!packet_parser.has_flag(protocol::PacketFlag::Unencrypted) && client->state == ConnectionState::CONNECTED) {
/* the ip had changed */
if(client->connection->verify_encryption(packet_parser)) {
auto old_address = net::to_string(client->remote_address);
auto new_address = net::to_string(remote_address);
@ -329,8 +340,8 @@ void VoiceServer::handleMessageRead(int fd, short events, void *_event_handle) {
}
}
if(client->state != ConnectionState::DISCONNECTED){
client->connection->handle_incoming_datagram(read_buffer.view(0, bytes_read));
if(client->state != ConnectionState::DISCONNECTED) {
client->connection->handle_incoming_datagram(packet_parser);
client = nullptr;
}
}
@ -415,6 +426,8 @@ inline ssize_t write_datagram(IOData<MHS>& io, const sockaddr_storage& address,
}
void VoiceServer::handleMessageWrite(int fd, short events, void *_event_handle) {
(void) events;
using WBufferPopResult = server::udp::PacketEncoder::BufferPopResult;
auto event_handle = (io::IOEventLoopEntry*) _event_handle;
auto voice_server = event_handle->voice_server;
@ -434,8 +447,9 @@ void VoiceServer::handleMessageWrite(int fd, short events, void *_event_handle)
while(system_clock::now() <= write_timeout){
if(!client) {
auto client_queue_state = event_handle->pop_voice_write_queue(client); /* we need a new client, the old client has nothing more to do */
if(client_queue_state == 2)
if(client_queue_state == 2) {
break;
}
assert(client);
more_clients = (bool) client_queue_state;
@ -525,6 +539,7 @@ void VoiceServer::handleMessageWrite(int fd, short events, void *_event_handle)
retrigger |= packet != nullptr; /* memory stored at packet is not accessible anymore. But anyways pop_dg_write_queue returns 0 if there is nothing more */
}
if(retrigger) {
event_add(event_handle->event_write, nullptr);
}

View File

@ -73,9 +73,9 @@ namespace ts {
std::recursive_mutex connectionLock;
std::deque<std::shared_ptr<VoiceClient>> activeConnections;
public:
void tickHandshakingClients();
void triggerWrite(const std::shared_ptr<VoiceClient> &);
void tickHandshakingClients();
void execute_resend(const std::chrono::system_clock::time_point& /* now */, std::chrono::system_clock::time_point& /* next resend */);
void send_datagram(int /* socket */, udp::DatagramPacket* /* packet */);

View File

@ -1,53 +0,0 @@
#include "log/LogUtils.h"
#include "WebIoManager.h"
#include "src/Configuration.h"
using namespace std;
using namespace ts::webio;
void EventLoop::invoker_loop() {
const timeval tick_timeout{10, 0};
const auto original_base = this->loop;
while(this->loop == original_base) {
event_base_loopexit(original_base, &tick_timeout);
event_base_dispatch(original_base);
}
debugMessage(LOG_GENERAL, "Event base dispatch of {} ended.", (void*) original_base);
}
LoopManager::LoopManager() {
lock_guard lock(this->loop_lock);
for(int i = 0; i < config::threads::web::io_loops; i++) {
auto loop = make_shared<EventLoop>();
loop->loop = event_base_new();
loop->invoker = make_unique<thread>(&EventLoop::invoker_loop, loop.get());
pthread_setname_np(loop->invoker->native_handle(), ("Web IO #" + to_string(i)).c_str());
this->loops.push_back(move(loop));
}
}
LoopManager::~LoopManager() {
lock_guard lock(this->loop_lock);
for(const auto& loop : this->loops) {
const auto base = loop->loop;
loop->loop = nullptr;
const timeval timeout_now{0, 1};
event_base_loopexit(base, &timeout_now);
if(loop->invoker && loop->invoker->joinable())
loop->invoker->join();
event_base_free(base);
}
}
std::shared_ptr<EventLoop> LoopManager::next_loop() {
lock_guard lock(this->loop_lock);
if(this->loops.empty())
return nullptr;
return this->loops[this->loop_index++ % this->loops.size()];
}

View File

@ -1,31 +0,0 @@
#pragma once
#include <event.h>
#include <thread>
#include <deque>
namespace ts {
namespace webio {
class LoopManager;
struct EventLoop {
friend class LoopManager;
public:
event_base* loop;
std::unique_ptr<std::thread> invoker;
private:
void invoker_loop();
};
class LoopManager {
public:
LoopManager();
~LoopManager();
std::shared_ptr<EventLoop> next_loop();
private:
std::atomic<size_t> loop_index{0};
std::mutex loop_lock;
std::deque<std::shared_ptr<EventLoop>> loops;
};
}
}

View File

@ -3,8 +3,8 @@
#include "WebServer.h"
#include "src/client/web/WebClient.h"
#include <netinet/tcp.h>
#include <src/client/ConnectedClient.h>
#include "src/InstanceHandler.h"
#include "./GlobalNetworkEvents.h"
using namespace std;
using namespace std::chrono;
@ -18,7 +18,7 @@ using namespace ts::server;
WebControlServer::WebControlServer(const std::shared_ptr<VirtualServer>& handle) : handle(handle) {}
WebControlServer::~WebControlServer() = default;
bool WebControlServer::start(const std::deque<std::shared_ptr<WebControlServer::Binding>>& bindings, std::string& error) {
bool WebControlServer::start(const std::deque<std::shared_ptr<WebControlServer::Binding>>& target_bindings, std::string& error) {
if(this->running()) {
error = "server already running";
return false;
@ -33,7 +33,7 @@ bool WebControlServer::start(const std::deque<std::shared_ptr<WebControlServer::
}
{
for(auto& binding : bindings) {
for(auto& binding : target_bindings) {
binding->file_descriptor = socket(binding->address.ss_family, SOCK_STREAM | SOCK_NONBLOCK, 0);
if(binding->file_descriptor < 0) {
logError(this->handle->getServerId(), "[Web] Failed to bind server to {}. (Failed to create socket: {} | {})", binding->as_string(), errno, strerror(errno));
@ -67,9 +67,13 @@ bool WebControlServer::start(const std::deque<std::shared_ptr<WebControlServer::
}
auto io_base = serverInstance->getWebIoLoop()->next_loop();
assert(io_base);
binding->event_accept = event_new(io_base->loop, binding->file_descriptor, EV_READ | EV_PERSIST, [](int a, short b, void* c){ ((WebControlServer *) c)->on_client_receive(a, b, c); }, this);
binding->event_accept = serverInstance->network_event_loop()->allocate_event(binding->file_descriptor, EV_READ | EV_PERSIST, WebControlServer::on_client_receive, this, nullptr);
if(!binding->event_accept) {
logError(this->handle->getServerId(), "[Web] Failed to allocate network event for binding {}.", binding->as_string());
close(binding->file_descriptor);
continue;
}
event_add(binding->event_accept, nullptr);
this->bindings.push_back(binding);
}
@ -97,12 +101,14 @@ inline std::string logging_address(const sockaddr_storage& address) {
return net::to_string(address, true);
}
void WebControlServer::on_client_receive(int _server_file_descriptor, short ev, void *arg) {
void WebControlServer::on_client_receive(int server_file_descriptor_, short, void *ptr_server) {
auto server = (WebControlServer*) ptr_server;
sockaddr_storage remote_address{};
memset(&remote_address, 0, sizeof(remote_address));
socklen_t address_length = sizeof(remote_address);
int file_descriptor = accept(_server_file_descriptor, (struct sockaddr *) &remote_address, &address_length);
int file_descriptor = accept(server_file_descriptor_, (struct sockaddr *) &remote_address, &address_length);
if (file_descriptor < 0) {
if(errno == EAGAIN) {
return;
@ -110,78 +116,78 @@ void WebControlServer::on_client_receive(int _server_file_descriptor, short ev,
if(errno == EMFILE || errno == ENFILE) {
if(errno == EMFILE) {
logError(this->handle->getServerId(), "[Web] Server ran out file descriptors. Please increase the process file descriptor limit.");
logError(server->handle->getServerId(), "[Web] Server ran out file descriptors. Please increase the process file descriptor limit.");
} else {
logError(this->handle->getServerId(), "[Web] Server ran out file descriptors. Please increase the process and system-wide file descriptor limit.");
logError(server->handle->getServerId(), "[Web] Server ran out file descriptors. Please increase the process and system-wide file descriptor limit.");
}
bool tmp_close_success = false;
{
lock_guard reserve_fd_lock(server_reserve_fd_lock);
if(this->server_reserve_fd > 0) {
debugMessage(this->handle->getServerId(), "[Web] Trying to accept client with the reserved file descriptor to close the incoming connection.");
lock_guard reserve_fd_lock(server->server_reserve_fd_lock);
if(server->server_reserve_fd > 0) {
debugMessage(server->handle->getServerId(), "[Web] Trying to accept client with the reserved file descriptor to close the incoming connection.");
auto _ = [&]{
if(close(this->server_reserve_fd) < 0) {
debugMessage(this->handle->getServerId(), "[Web] Failed to close reserved file descriptor");
if(close(server->server_reserve_fd) < 0) {
debugMessage(server->handle->getServerId(), "[Web] Failed to close reserved file descriptor");
tmp_close_success = false;
return;
}
this->server_reserve_fd = 0;
server->server_reserve_fd = 0;
errno = 0;
file_descriptor = accept(_server_file_descriptor, (struct sockaddr *) &remote_address, &address_length);
file_descriptor = accept(server_file_descriptor_, (struct sockaddr *) &remote_address, &address_length);
if(file_descriptor < 0) {
if(errno == EMFILE || errno == ENFILE)
debugMessage(this->handle->getServerId(), "[Web] [{}] Even with freeing the reserved descriptor accept failed. Attempting to reclaim reserved file descriptor", logging_address(remote_address));
debugMessage(server->handle->getServerId(), "[Web] [{}] Even with freeing the reserved descriptor accept failed. Attempting to reclaim reserved file descriptor", logging_address(remote_address));
else if(errno == EAGAIN);
else {
debugMessage(this->handle->getServerId(), "[Web] [{}] Failed to accept client with reserved file descriptor. ({} | {})", logging_address(remote_address), errno, strerror(errno));
debugMessage(server->handle->getServerId(), "[Web] [{}] Failed to accept client with reserved file descriptor. ({} | {})", logging_address(remote_address), errno, strerror(errno));
}
this->server_reserve_fd = dup(1);
if(this->server_reserve_fd < 0)
debugMessage(this->handle->getServerId(), "[Web] [{}] Failed to reclaim reserved file descriptor. Future clients cant be accepted!", logging_address(remote_address));
server->server_reserve_fd = dup(1);
if(server->server_reserve_fd < 0)
debugMessage(server->handle->getServerId(), "[Web] [{}] Failed to reclaim reserved file descriptor. Future clients cant be accepted!", logging_address(remote_address));
else
tmp_close_success = true;
return;
}
debugMessage(this->handle->getServerId(), "[Web] [{}] Successfully accepted client via reserved descriptor (fd: {}). Disconnecting client.", logging_address(remote_address), file_descriptor);
debugMessage(server->handle->getServerId(), "[Web] [{}] Successfully accepted client via reserved descriptor (fd: {}). Disconnecting client.", logging_address(remote_address), file_descriptor);
CLOSE_CONNECTION
this->server_reserve_fd = dup(1);
if(this->server_reserve_fd < 0)
debugMessage(this->handle->getServerId(), "[Web] Failed to reclaim reserved file descriptor. Future clients cant be accepted!");
server->server_reserve_fd = dup(1);
if(server->server_reserve_fd < 0)
debugMessage(server->handle->getServerId(), "[Web] Failed to reclaim reserved file descriptor. Future clients cant be accepted!");
else
tmp_close_success = true;
logMessage(this->handle->getServerId(), "[Web] [{}] Dropping file transfer connection attempt because of too many open file descriptors.", logging_address(remote_address));
logMessage(server->handle->getServerId(), "[Web] [{}] Dropping file transfer connection attempt because of too many open file descriptors.", logging_address(remote_address));
};
_();
}
}
if(!tmp_close_success) {
debugMessage(this->handle->getServerId(), "[Web] Sleeping two seconds because we're currently having no resources for this user. (Removing the accept event)");
for(auto& binding : this->bindings)
debugMessage(server->handle->getServerId(), "[Web] Sleeping two seconds because we're currently having no resources for this user. (Removing the accept event)");
for(auto& binding : server->bindings)
event_del_noblock(binding->event_accept);
accept_event_deleted = system_clock::now();
server->accept_event_deleted = system_clock::now();
return;
}
return;
}
logMessage(this->handle->getServerId(), "[Web] Got an error while accepting a new client. (errno: {}, message: {})", errno, strerror(errno));
logMessage(server->handle->getServerId(), "[Web] Got an error while accepting a new client. (errno: {}, message: {})", errno, strerror(errno));
return;
}
auto client = std::make_shared<WebClient>(this, file_descriptor);
auto client = std::make_shared<WebClient>(server, file_descriptor);
memcpy(&client->remote_address, &remote_address, sizeof(remote_address));
client->initialize_weak_reference(client);
client->initialize();
this->clientLock.lock();
this->clients.push_back(client);
this->clientLock.unlock();
server->clientLock.lock();
server->clients.push_back(client);
server->clientLock.unlock();
event_add(client->readEvent, nullptr);
logMessage(this->handle->getServerId(), "[Web] Got new client from {}:{}", client->getLoggingPeerIp(), client->getPeerPort());
logMessage(server->handle->getServerId(), "[Web] Got new client from {}:{}", client->getLoggingPeerIp(), client->getPeerPort());
}
void WebControlServer::stop() {

View File

@ -50,7 +50,7 @@ namespace ts {
//IO stuff
std::chrono::system_clock::time_point accept_event_deleted;
private:
void on_client_receive(int fd, short ev, void *arg);
static void on_client_receive(int, short, void *);
void unregisterConnection(const std::shared_ptr<WebClient>&);
};
}

2
shared

@ -1 +1 @@
Subproject commit 0a73e4c10c4af3b6f982c0e2986f122e75bd20a8
Subproject commit 0726cd6c95ff5597bfc20ac2bb560ad03ace7b49