totally fucked up

This commit is contained in:
WolverinDEV 2020-03-17 12:08:07 +01:00
parent 58666b8906
commit d6f483a019
43 changed files with 1155 additions and 985 deletions

@ -1 +1 @@
Subproject commit 9a26231c1f6c44f799419f87a3cac37a55425bdb
Subproject commit cba03a6316db76fdf056a960ee509ad20542ea44

View File

@ -33,8 +33,8 @@ add_definitions(-DUSE_BORINGSSL)
#3 = PRIVATE
option(BUILD_TYPE "Sets the build type" OFF)
option(BUILD_TYPE_NAME "Sets the build type name" OFF)
option(COMPILE_WEB_CLIENT "Enable/Disable the web cleint future" OFF)
#set(COMPILE_WEB_CLIENT "ON")
option(COMPILE_WEB_CLIENT "Enable/Disable the web client future" OFF)
set(COMPILE_WEB_CLIENT "OFF") #FIXME!
set(CMAKE_VERBOSE_MAKEFILE ON)
set(SERVER_SOURCE_FILES
@ -147,6 +147,20 @@ set(SERVER_SOURCE_FILES
src/services/PermissionsService.cpp
src/services/ClientChannelService.cpp
src/music/PlaylistPermissions.cpp
src/lincense/LicenseService.cpp
src/groups/GroupManager.cpp
src/groups/GroupAssignmentManager.cpp
src/groups/Group.cpp
src/services/VirtualServerInformation.cpp
src/vserver/VirtualServerManager.cpp
src/services/VirtualServerBroadcastService.cpp
src/server/udp-server/UDPServer.cpp
src/client/voice/PacketEncoder.cpp
src/client/voice/PacketDecoder.cpp
src/client/voice/PingHandler.cpp
src/client/query/QueryClientConnection.cpp
)
if (COMPILE_WEB_CLIENT)
add_definitions(-DCOMPILE_WEB_CLIENT)
@ -160,7 +174,7 @@ if (COMPILE_WEB_CLIENT)
src/client/web/WSWebClient.cpp
src/client/web/SampleHandler.cpp
src/client/web/VoiceBridge.cpp
src/client/command_handler/helpers.h src/music/PlaylistPermissions.cpp src/music/PlaylistPermissions.h src/lincense/LicenseService.cpp src/lincense/LicenseService.h src/groups/GroupManager.cpp src/groups/GroupManager.h src/groups/GroupAssignmentManager.cpp src/groups/GroupAssignmentManager.h src/groups/Group.cpp src/groups/Group.h src/services/VirtualServerInformation.cpp src/services/VirtualServerInformation.h src/vserver/VirtualServerManager.cpp src/vserver/VirtualServerManager.h src/services/VirtualServerBroadcastService.cpp src/services/VirtualServerBroadcastService.h src/server/udp-server/UDPServer.cpp src/server/udp-server/UDPServer.h src/client/voice/PacketEncoder.cpp src/client/voice/PacketEncoder.h src/client/voice/PacketDecoder.cpp src/client/voice/PacketDecoder.h src/client/voice/PingHandler.cpp src/client/voice/PingHandler.h)
)
endif ()
add_executable(PermHelper helpers/permgen.cpp)
@ -276,7 +290,6 @@ target_link_libraries(TeaSpeakServer
#Require a so
sqlite3
DataPipes::rtc::shared
breakpad::static
protobuf::libprotobuf
@ -290,8 +303,10 @@ target_link_libraries(TeaSpeakServer
)
if (COMPILE_WEB_CLIENT)
target_link_libraries(TeaSpeakServer ${glib20_DIR}/lib/x86_64-linux-gnu/libffi.so.7 ${nice_DIR}/lib/libnice.so.10)
endif ()
target_link_libraries(TeaSpeakServer DataPipes::rtc::shared ${glib20_DIR}/lib/x86_64-linux-gnu/libffi.so.7 ${nice_DIR}/lib/libnice.so.10)
else()
target_link_libraries(TeaSpeakServer DataPipes::core::shared)
endif()
# include_directories(${LIBRARY_PATH}/boringssl/include/)
target_link_libraries(TeaSpeakServer

View File

@ -7,6 +7,7 @@ General lock order:
When executing a command:
Lock order:
- Client execute lock
- Client state lock
- Server state lock (Server should not try to change state while a client is executing something)
Notes:
The server might be null or the default server.

9
server/namespaces Normal file
View File

@ -0,0 +1,9 @@
The general namespace prefix is ts::
TeaSpeak - Server: ts::server
Basic: ts::server
Sub-Server:
Query: ts::server::server::query
Voice: ts::server::server::udp
File: ts::server::server::file
Web: ts::server::server::web

View File

@ -23,6 +23,7 @@
#include <misc/strobf.h>
#include <jemalloc/jemalloc.h>
#include <protocol/buffers.h>
#include "src/server/udp-server/UDPServer.h"
#ifndef _POSIX_SOURCE
#define _POSIX_SOURCE
@ -389,6 +390,12 @@ FwIDAQAB
}
}
this->udpServer = new server::udp::Server{};
if(std::string error{}; !this->udpServer->initialize(error)) {
logCritical(LOG_INSTANCE, "Failed to allocate UDP server.");
return false;
}
this->voiceServerManager = new VirtualServerManager(this);
if (!this->voiceServerManager->initialize(true)) {
logCritical(LOG_INSTANCE, "Could not load servers!");
@ -433,22 +440,22 @@ void InstanceHandler::stopInstance() {
debugMessage(LOG_INSTANCE, "Stopping all virtual servers");
if (this->voiceServerManager)
this->voiceServerManager->shutdownAll(ts::config::messages::applicationStopped);
delete this->voiceServerManager;
this->voiceServerManager = nullptr;
delete std::exchange(this->voiceServerManager, nullptr);
debugMessage(LOG_INSTANCE, "All virtual server stopped");
debugMessage(LOG_QUERY, "Stopping query server");
if (this->queryServer) this->queryServer->stop();
delete this->queryServer;
this->queryServer = nullptr;
delete std::exchange(this->queryServer, nullptr);
debugMessage(LOG_QUERY, "Query server stopped");
debugMessage(LOG_FT, "Stopping file server");
if (this->fileServer) this->fileServer->stop();
delete this->fileServer;
this->fileServer = nullptr;
delete std::exchange(this->fileServer, nullptr);
debugMessage(LOG_FT, "File server stopped");
if(this->udpServer) this->udpServer->finalize();
delete std::exchange(this->udpServer, nullptr);
this->save_channel_permissions();
this->save_group_permissions();

View File

@ -23,6 +23,10 @@ namespace ts {
class LicenseService;
}
namespace server::udp {
class Server;
}
class InstanceHandler {
public:
explicit InstanceHandler(SqlDataManager*);
@ -49,6 +53,8 @@ namespace ts {
ssl::SSLManager* sslManager(){ return this->sslMgr; }
sql::SqlManager* getSql(){ return sql->sql(); }
[[nodiscard]] inline auto udp_server() { return this->udpServer; }
std::chrono::time_point<std::chrono::system_clock> getStartTimestamp(){ return startTimestamp; }
void executeTick(VirtualServer*);
@ -112,6 +118,8 @@ namespace ts {
FileServer* fileServer = nullptr;
QueryServer* queryServer = nullptr;
server::udp::Server* udpServer{nullptr};
VirtualServerManager* voiceServerManager = nullptr;
DatabaseHelper* dbHelper = nullptr;
bans::BanManager* banMgr = nullptr;

View File

@ -88,7 +88,7 @@ bool VirtualServer::registerClient(shared_ptr<ConnectedClient> client) {
bool VirtualServer::unregisterClient(shared_ptr<ConnectedClient> cl, std::string reason, std::unique_lock<std::shared_mutex>& chan_tree_lock) {
if(cl->getType() == ClientType::CLIENT_TEAMSPEAK && cl->getType() == ClientType::CLIENT_WEB) {
sassert(cl->state == ConnectionState::DISCONNECTED);
sassert(cl->state == ClientState::DISCONNECTED);
}
auto client_id = cl->getClientId();
@ -128,7 +128,7 @@ bool VirtualServer::unregisterClient(shared_ptr<ConnectedClient> cl, std::string
}
void VirtualServer::registerInternalClient(std::shared_ptr<ConnectedClient> client) {
client->state = ConnectionState::CONNECTED;
client->state = ClientState::CONNECTED;
{
lock_guard lock(this->clients.lock);
if(client->getClientId() > 0) {
@ -152,7 +152,7 @@ void VirtualServer::registerInternalClient(std::shared_ptr<ConnectedClient> clie
}
void VirtualServer::unregisterInternalClient(std::shared_ptr<ConnectedClient> client) {
client->state = ConnectionState::DISCONNECTED;
client->state = ClientState::DISCONNECTED;
{
auto client_id = client->getClientId();

View File

@ -24,6 +24,7 @@
#include "Configuration.h"
#include "VirtualServer.h"
#include "src/manager/ConversationManager.h"
#include "src/server/udp-server/UDPServer.h"
#include <misc/sassert.h>
using namespace std;
@ -313,64 +314,65 @@ bool VirtualServer::start(std::string& error) {
}
}
auto host = this->properties()[property::VIRTUALSERVER_HOST].as<string>();
if(config::binding::enforce_default_voice_host)
host = config::binding::DefaultVoiceHost;
{
auto host = this->properties()[property::VIRTUALSERVER_HOST].as<string>();
if(config::binding::enforce_default_voice_host)
host = config::binding::DefaultVoiceHost;
if(host.empty()){
error = "invalid host (\"" + host + "\")";
this->stop("failed to start", true);
return false;
}
if(this->properties()[property::VIRTUALSERVER_PORT].as<uint16_t>() <= 0){
error = "invalid port";
this->stop("failed to start", true);
return false;
}
deque<shared_ptr<VoiceServerBinding>> bindings;
for(const auto& address : split_hosts(host, ',')) {
auto entry = make_shared<VoiceServerBinding>();
if(net::is_ipv4(address)) {
sockaddr_in addr{};
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_port = htons(this->properties()[property::VIRTUALSERVER_PORT].as<uint16_t>());
if(!evaluateAddress4(address, addr.sin_addr)) {
logError(this->serverId, "Fail to resolve v4 address info for \"{}\"", address);
continue;
}
memcpy(&entry->address, &addr, sizeof(addr));
} else if(net::is_ipv6(address)) {
sockaddr_in6 addr{};
memset(&addr, 0, sizeof(addr));
addr.sin6_family = AF_INET6;
addr.sin6_port = htons(this->properties()[property::VIRTUALSERVER_PORT].as<uint16_t>());
if(!evaluateAddress6(address, addr.sin6_addr)) {
logError(this->serverId, "Fail to resolve v6 address info for \"{}\"", address);
continue;
}
memcpy(&entry->address, &addr, sizeof(addr));
} else {
logError(this->serverId, "Failed to determinate address type for \"{}\"", address);
continue;
if(host.empty()){
error = "invalid host (\"" + host + "\")";
this->stop("failed to start", true);
return false;
}
if(this->properties()[property::VIRTUALSERVER_PORT].as<uint16_t>() <= 0){
error = "invalid port";
this->stop("failed to start", true);
return false;
}
bindings.push_back(entry);
}
if(bindings.empty()) {
error = "failed to resole any host!";
this->stop("failed to start", false);
return false;
}
//Setup voice server
udpVoiceServer = make_shared<VoiceServer>(self.lock());
if(!udpVoiceServer->start(bindings, error)) {
error = "could not start voice server. Message: " + error;
this->stop("failed to start", false);
return false;
deque<shared_ptr<VoiceServerBinding>> bindings;
for(const auto& address : split_hosts(host, ',')) {
auto entry = make_shared<VoiceServerBinding>();
if(net::is_ipv4(address)) {
sockaddr_in addr{};
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_port = htons(this->properties()[property::VIRTUALSERVER_PORT].as<uint16_t>());
if(!evaluateAddress4(address, addr.sin_addr)) {
logError(this->serverId, "Fail to resolve v4 address info for \"{}\"", address);
continue;
}
memcpy(&entry->address, &addr, sizeof(addr));
} else if(net::is_ipv6(address)) {
sockaddr_in6 addr{};
memset(&addr, 0, sizeof(addr));
addr.sin6_family = AF_INET6;
addr.sin6_port = htons(this->properties()[property::VIRTUALSERVER_PORT].as<uint16_t>());
if(!evaluateAddress6(address, addr.sin6_addr)) {
logError(this->serverId, "Fail to resolve v6 address info for \"{}\"", address);
continue;
}
memcpy(&entry->address, &addr, sizeof(addr));
} else {
logError(this->serverId, "Failed to determinate address type for \"{}\"", address);
continue;
}
bindings.push_back(entry);
}
if(bindings.empty()) {
error = "failed to resole any host!";
this->stop("failed to start", false);
return false;
}
auto result = serverInstance->udp_server()->register_virtual_server(this);
if(result != server::udp::ServerRegisterResult::SUCCESS) {
error = "failed to start udp voice server (" + std::to_string((int) result) + ")";
this->stop("failed to start", false);
return false;
}
}
if(ts::config::web::activated && serverInstance->sslManager()->web_ssl_options()) {
@ -501,9 +503,7 @@ void VirtualServer::stop(const std::string& reason, bool disconnect_query) {
this->musicManager->disconnectBots();
serverInstance->cancelExecute(this);
if(this->udpVoiceServer) this->udpVoiceServer->stop();
this->udpVoiceServer = nullptr;
serverInstance->udp_server()->unregister_virtual_server(this);
#ifdef COMPILE_WEB_CLIENT
if(this->webControlServer) this->webControlServer->stop();
@ -637,30 +637,26 @@ std::shared_ptr<ConnectedClient> VirtualServer::findClient(std::string name, boo
}
bool VirtualServer::forEachClient(std::function<void(std::shared_ptr<ConnectedClient>)> function) {
for(const auto& elm : this->getClients()) {
shared_lock close_lock(elm->finalDisconnectLock, try_to_lock_t{});
if(close_lock.owns_lock()) //If not locked than client is on the way to disconnect
if(elm->state == ConnectionState::CONNECTED && elm->getType() != ClientType::CLIENT_INTERNAL) {
function(elm);
}
}
for(const auto& elm : this->getClients())
if(elm->state == ClientState::CONNECTED && elm->getType() != ClientType::CLIENT_INTERNAL)
function(elm);
return true;
}
std::vector<std::shared_ptr<ConnectedClient>> VirtualServer::getClients() {
vector<shared_ptr<ConnectedClient>> clients;
vector<shared_ptr<ConnectedClient>> result{};
{
lock_guard lock(this->clients.lock);
clients.reserve(this->clients.count);
result.reserve(this->clients.count);
for(auto& client : this->clients.clients) {
if(!client) continue;
clients.push_back(client);
result.push_back(client);
}
}
return clients;
return result;
}
deque<shared_ptr<ConnectedClient>> VirtualServer::getClientsByChannel(std::shared_ptr<BasicChannel> channel) {
@ -677,7 +673,7 @@ deque<shared_ptr<ConnectedClient>> VirtualServer::getClientsByChannel(std::share
for(const auto& weak_client : weak_clients) {
auto client = weak_client.lock();
if(!client) continue;
if(client->connectionState() != ConnectionState::CONNECTED) continue;
if(client->connectionState() != ClientState::CONNECTED) continue;
if(client->getChannel() != channel) continue; /* to be sure */
result.push_back(move(client));
@ -1189,7 +1185,7 @@ void VirtualServer::send_text_message(const std::shared_ptr<BasicChannel> &chann
bool conversation_private = channel->properties()[property::CHANNEL_FLAG_CONVERSATION_PRIVATE].as<bool>();
auto flag_password = channel->properties()[property::CHANNEL_FLAG_PASSWORD].as<bool>();
for(const auto& client : this->getClients()) {
if(client->connectionState() != ConnectionState::CONNECTED)
if(client->connectionState() != ClientState::CONNECTED)
continue;
auto type = client->getType();

View File

@ -204,7 +204,6 @@ namespace ts {
std::shared_ptr<stats::ConnectionStatistics> getServerStatistics(){ return serverStatistics; }
std::shared_ptr<VoiceServer> getVoiceServer(){ return this->udpVoiceServer; }
WebControlServer* getWebServer(){ return this->webControlServer; }
/* calculate permissions for an client in this server */
@ -288,7 +287,6 @@ namespace ts {
std::chrono::system_clock::time_point lastTick;
void executeServerTick();
std::shared_ptr<VoiceServer> udpVoiceServer = nullptr;
WebControlServer* webControlServer = nullptr;
ts::server::tokens::TokenManager* tokenManager = nullptr;
ComplainManager* complains = nullptr;

View File

@ -444,6 +444,6 @@ void VirtualServerManager::tickHandshakeClients() {
for(const auto& server : this->serverInstances()) {
auto vserver = server->getVoiceServer();
if(vserver)
vserver->tickHandshakingClients();
vserver->tickClients();
}
}

View File

@ -535,7 +535,7 @@ bool ConnectedClient::notifyClientLeftViewBanned(const std::shared_ptr<Connected
}
bool ConnectedClient::sendNeededPermissions(bool enforce) {
if(!enforce && this->state != ConnectionState::CONNECTED) return false;
if(!enforce && this->state != ClientState::CONNECTED) return false;
if(!enforce && chrono::system_clock::now() - this->lastNeededNotify < chrono::seconds(5) && this->lastNeededPermissionNotifyChannel == this->currentChannel) { //Dont spam these (hang up ui)
this->requireNeededPermissionResend = true;
@ -720,7 +720,7 @@ void ConnectedClient::sendChannelDescription(const std::shared_ptr<BasicChannel>
void ConnectedClient::tick(const std::chrono::system_clock::time_point &time) {
ALARM_TIMER(A1, "ConnectedClient::tick", milliseconds(2));
if(this->state == ConnectionState::CONNECTED) {
if(this->state == ClientState::CONNECTED) {
if(this->requireNeededPermissionResend)
this->sendNeededPermissions(false);
if(this->lastOnlineTimestamp.time_since_epoch().count() == 0) {
@ -833,7 +833,7 @@ bool ConnectedClient::handleCommandFull(Command& cmd, bool disconnectOnFail) {
if(generateReturnStatus)
this->notifyError(result, cmd["return_code"].size() > 0 ? cmd["return_code"].first().as<std::string>() : "");
if(result.error_code() != error::ok && this->state == ConnectionState::INIT_HIGH)
if(result.error_code() != error::ok && this->state == ClientState::INITIALIZING)
this->close_connection(system_clock::now()); //Disconnect now
for (const auto& handler : postCommandHandler)

View File

@ -68,14 +68,13 @@ namespace ts {
friend class QueryServer;
friend class DataClient;
friend class SpeakingClient;
friend class connection::VoiceClientConnection;
friend class ts::GroupManager;
friend class VirtualServerManager;
public:
explicit ConnectedClient(sql::SqlManager*, const std::shared_ptr<VirtualServer>& server);
~ConnectedClient() override;
ConnectionState connectionState(){ return this->state; }
ClientState connectionState(){ return this->state; }
std::string getLoggingPeerIp() { return config::server::disable_ip_saving || (this->server && this->server->disable_ip_saving()) ? "X.X.X.X" : this->getPeerIp(); }
std::string getPeerIp(){ return this->isAddressV4() ? net::to_string(this->getAddressV4()->sin_addr) : this->isAddressV6() ? net::to_string(this->getAddressV6()->sin6_addr) : "localhost"; }
uint16_t getPeerPort(){ return ntohs(this->isAddressV4() ? this->getAddressV4()->sin_port : this->isAddressV6() ? this->getAddressV6()->sin6_port : (uint16_t) 0); }
@ -273,29 +272,14 @@ namespace ts {
*/
bool update_cached_permissions();
std::shared_lock<std::shared_mutex> require_connected_state(bool blocking = false) {
//try_to_lock_t
std::shared_lock<std::shared_mutex> disconnect_lock{};
if(blocking) [[unlikely]]
disconnect_lock = std::shared_lock{this->finalDisconnectLock};
else
disconnect_lock = std::shared_lock{this->finalDisconnectLock, std::try_to_lock};
if(!disconnect_lock) [[unlikely]]
return disconnect_lock;
{
std::lock_guard state_lock{this->state_lock};
if(this->state != ConnectionState::CONNECTED)
return {};
}
return disconnect_lock;
}
inline bool playlist_subscribed(const std::shared_ptr<ts::music::Playlist>& playlist) const {
return this->_subscribed_playlist.lock() == playlist;
}
[[nodiscard]] auto require_connected_state() {
return ts::rwshared_lock{this->state_lock};
}
template <typename T = std::lock_guard<threads::Mutex>>
[[nodiscard]] inline auto lock_command_handling() { return T{this->command_lock}; }
void increase_join_state() { this->join_state_id++; }
@ -304,13 +288,11 @@ namespace ts {
sockaddr_storage remote_address;
//General states
std::mutex state_lock;
ConnectionState state{ConnectionState::UNKNWON};
ts::rw_mutex state_lock{};
ClientState state{ClientState::UNKNWON};
bool allowedToTalk = false;
std::shared_mutex finalDisconnectLock; /* locked before state lock! */
std::deque<std::weak_ptr<ConnectedClient>> visibleClients{}; /* variable locked with channel_lock */
std::deque<std::weak_ptr<ConnectedClient>> mutedClients{}; /* variable locked with channel_lock */
std::deque<std::weak_ptr<ConnectedClient>> openChats{}; /* variable locked with channel_lock */
@ -647,7 +629,7 @@ namespace ts {
this->connection_lock = std::move(other.connection_lock);
}
inline bool valid() const { return !!this->client && !!this->connection_lock; }
inline bool valid() const { return !!this->client && this->connection_lock.shared_locked() && client->connectionState() == ClientState::CONNECTED; }
inline operator bool() const { return this->valid(); }
@ -661,7 +643,7 @@ namespace ts {
T &operator*() { return *this->client; }
std::shared_ptr<T> client;
std::shared_lock<std::shared_mutex> connection_lock{};
ts::rwshared_lock<ts::rw_mutex> connection_lock{};
};
}
}

View File

@ -629,6 +629,7 @@ bool ConnectedClient::handle_text_command(
if(!vc) return false;
send_message(_this.lock(), "Packet generations:");
auto& id_generator = vc->getConnection()->packet_encoder().id_generator();
for(const auto& type : {
protocol::PacketTypeInfo::Command,
protocol::PacketTypeInfo::CommandLow,
@ -639,8 +640,8 @@ bool ConnectedClient::handle_text_command(
protocol::PacketTypeInfo::Ping,
protocol::PacketTypeInfo::Pong}) {
auto id = vc->getConnection()->getPacketIdManager().currentPacketId(type);
auto gen = vc->getConnection()->getPacketIdManager().generationId(type);
auto id = id_generator.currentPacketId(type);
auto gen = id_generator.generationId(type);
send_message(_this.lock(), " OUT " + type.name() + " => generation: " + to_string(gen) + " id: " + to_string(id));
//auto& buffer = vc->getConnection()->packet_buffers()[type.type()];
//send_message(_this.lock(), " IN " + type.name() + " => generation: " + to_string(buffer.generation(0)) + " id: " + to_string(buffer.current_index()));

View File

@ -622,7 +622,7 @@ command_result SpeakingClient::handleCommandClientInit(Command& cmd) {
{
for(const auto &cl : this->server->getClients())
if((cl->getType() == CLIENT_TEAMSPEAK || cl->getType() == CLIENT_WEB || cl->getType() == CLIENT_TEASPEAK || cl->getType() == CLIENT_MUSIC))
if(cl->connectionState() <= ConnectionState::CONNECTED && cl->connectionState() >= ConnectionState::INIT_HIGH)
if(cl->connectionState() <= ClientState::CONNECTED && cl->connectionState() >= ClientState::CONNECTED) //TODO: Get "real" state and do not ignore just initializing clients!
count++;
}
@ -653,19 +653,8 @@ command_result SpeakingClient::handleCommandClientInit(Command& cmd) {
}
}
this->postCommandHandler.emplace_back([&](){
auto self = dynamic_pointer_cast<SpeakingClient>(_this.lock());
std::thread([self](){
if(self->state != ConnectionState::INIT_HIGH) return;
try {
self->processJoin();
} catch (std::exception& ex) {
logError(self->getServerId(), "Failed to proceed client join for {}. Got exception with message {}", CLIENT_STR_LOG_PREFIX_(self), ex.what());
self->close_connection(chrono::system_clock::now() + chrono::seconds{5});
}
}).detach();
});
/* we're not doing this anymore from a different thread */
this->processJoin();
debugMessage(this->getServerId(), "{} Client init timings: {}", CLIENT_STR_LOG_PREFIX, TIMING_FINISH(timings));
return command_result{error::ok};
}
@ -679,7 +668,7 @@ void SpeakingClient::processJoin() {
this->resetIdleTime();
threads::MutexLock lock(this->command_lock); //Don't process any commands!
if(this->state != ConnectionState::INIT_HIGH) {
if(this->state != ClientState::INITIALIZING) {
logError(this->getServerId(), "{} Invalid processJoin() connection state!", CLIENT_STR_LOG_PREFIX);
return;
}
@ -729,7 +718,7 @@ void SpeakingClient::processJoin() {
TIMING_STEP(timings, "assign chan");
this->sendChannelList(true);
this->state = ConnectionState::CONNECTED;
this->state = ClientState::CONNECTED;
TIMING_STEP(timings, "send chan t");
/* trick the join method */
@ -767,7 +756,7 @@ void SpeakingClient::processJoin() {
void SpeakingClient::processLeave() {
auto ownLock = _this.lock();
auto server = this->getServer();
if(server){
if(server) {
logMessage(this->getServerId(), "Voice client {}/{} ({}) from {} left.", this->getClientDatabaseId(), this->getUid(), this->getDisplayName(), this->getLoggingPeerIp() + ":" + to_string(this->getPeerPort()));
{
unique_lock server_channel_lock(this->server->channel_tree_lock);
@ -777,14 +766,6 @@ void SpeakingClient::processLeave() {
server->musicManager->cleanup_client_bots(this->getClientDatabaseId());
//ref_server = nullptr; Removed caused nullptr exceptions
}
{ //Delete own viewing clients
/*
* No need, are only weak references!
threads::MutexLock l(this->viewLock);
this->visibleClients.clear();
this->mutedClients.clear();
*/
}
}
void SpeakingClient::triggerVoiceEnd() {
@ -815,7 +796,7 @@ void SpeakingClient::tick(const std::chrono::system_clock::time_point &time) {
ALARM_TIMER(A1, "SpeakingClient::tick", milliseconds(2));
this->updateSpeak(true, time);
if(this->state == ConnectionState::CONNECTED) {
if(this->state == ClientState::CONNECTED) {
if(this->max_idle_time.has_value) {
auto max_idle = this->max_idle_time.value;
if(max_idle > 0 && this->idleTimestamp.time_since_epoch().count() > 0 && duration_cast<seconds>(time - this->idleTimestamp).count() > max_idle) {
@ -833,7 +814,7 @@ void SpeakingClient::updateChannelClientProperties(bool channel_lock, bool notif
}
command_result SpeakingClient::handleCommand(Command &command) {
if(this->connectionState() == ConnectionState::INIT_HIGH) {
if(this->connectionState() == ClientState::INITIALIZING) {
if(this->handshake.state == HandshakeState::BEGIN || this->handshake.state == HandshakeState::IDENTITY_PROOF) {
command_result result;
if(command.command() == "handshakebegin")

View File

@ -569,7 +569,7 @@ command_result ConnectedClient::handleCommandSendTextMessage(Command &cmd) {
if(this->handleTextMessage(ChatMessageMode::TEXTMODE_SERVER, cmd["msg"], nullptr)) return command_result{error::ok};
for(const auto& client : this->server->getClients()) {
if (client->connectionState() != ConnectionState::CONNECTED)
if (client->connectionState() != ClientState::CONNECTED)
continue;
auto type = client->getType();
@ -2526,7 +2526,7 @@ command_result ConnectedClient::handleCommandConversationMessageDelete(ts::Comma
auto delete_count = current_conversation->delete_messages(timestamp_end, limit, timestamp_begin, bulk["cldbid"]);
if(delete_count > 0) {
for(const auto& client : ref_server->getClients()) {
if(client->connectionState() != ConnectionState::CONNECTED)
if(client->connectionState() != ClientState::CONNECTED)
continue;
auto type = client->getType();

View File

@ -8,6 +8,7 @@
#include "src/InstanceHandler.h"
#include <pipes/errors.h>
#include <misc/std_unique_ptr.h>
#include "./QueryClientConnection.h"
using namespace std;
using namespace std::chrono;
@ -20,41 +21,20 @@ using namespace ts::server;
//#define DEBUG_TRAFFIC
QueryClient::QueryClient(QueryServer* handle, int sockfd) : ConnectedClient(handle->sql, nullptr), handle(handle), clientFd(sockfd) {
QueryClient::QueryClient(QueryServer* handle, int sockfd) : ConnectedClient(handle->sql, nullptr), handle(handle) {
memtrack::allocated<QueryClient>(this);
int enabled = 1;
int disabled = 0;
setsockopt(sockfd, SOL_SOCKET, SO_KEEPALIVE, &enabled, sizeof(enabled));
if(setsockopt(sockfd, IPPROTO_TCP, TCP_NOPUSH, &disabled, sizeof disabled) < 0) {
logError(this->getServerId(), "[Query] Could not disable nopush for {} ({}/{})", CLIENT_STR_LOG_PREFIX_(this), errno, strerror(errno));
}
if(setsockopt(sockfd, IPPROTO_TCP, TCP_NODELAY, &enabled, sizeof enabled) < 0) {
logError(this->getServerId(), "[Query] Could not disable no delay for {} ({}/{})", CLIENT_STR_LOG_PREFIX_(this), errno, strerror(errno));
}
this->readEvent = event_new(this->handle->eventLoop, this->clientFd, EV_READ | EV_PERSIST, [](int a, short b, void* c){ ((QueryClient*) c)->handleMessageRead(a, b, c); }, this);
this->writeEvent = event_new(this->handle->eventLoop, this->clientFd, EV_WRITE, [](int a, short b, void* c){ ((QueryClient*) c)->handleMessageWrite(a, b, c); }, this);
this->connection = new server::query::QueryClientConnection{this, sockfd};
this->state = ConnectionState::CONNECTED;
this->state = ClientState::INITIALIZING;
connectedTimestamp = system_clock::now();
this->resetEventMask();
}
void QueryClient::applySelfLock(const std::shared_ptr<ts::server::QueryClient> &cl) {
bool QueryClient::initialize(std::string& error, const std::shared_ptr<ts::server::QueryClient> &cl) {
this->_this = cl;
}
QueryClient::~QueryClient() {
memtrack::freed<QueryClient>(this);
// if(this->closeLock.tryLock() != 0)
// logCritical("Query manager deleted, but is still in usage! (closeLock)");
// if(this->bufferLock.tryLock() != 0)
// logCritical("Query manager deleted, but is still in usage! (bufferLock)");
this->ssl_handler.finalize();
}
void QueryClient::preInitialize() {
this->properties()[property::CLIENT_TYPE] = ClientType::CLIENT_QUERY;
this->properties()[property::CLIENT_TYPE_EXACT] = ClientType::CLIENT_QUERY;
this->properties()[property::CLIENT_UNIQUE_IDENTIFIER] = "UnknownQuery";
@ -62,19 +42,24 @@ void QueryClient::preInitialize() {
DatabaseHelper::assignDatabaseId(this->sql, this->getServerId(), _this.lock());
if(ts::config::query::sslMode == 0) {
this->connectionType = ConnectionType::PLAIN;
this->postInitialize();
}
/* may already calls handle_connection_initialized() */
if(!this->connection->initialize(error))
return false;
return true;
}
void QueryClient::postInitialize() {
lock_guard<recursive_mutex> lock(this->lock_packet_handle); /* we dont want to handle anything while we're initializing */
QueryClient::~QueryClient() {
memtrack::freed<QueryClient>(this);
delete this->connection;
}
void QueryClient::handle_connection_initialized() {
std::lock_guard lock{this->command_lock}; /* we dont want to handle anything while we're initializing */
this->connectTimestamp = system_clock::now();
this->properties()[property::CLIENT_LASTCONNECTED] = duration_cast<seconds>(this->connectTimestamp.time_since_epoch()).count();
if(ts::config::query::sslMode == 1 && this->connectionType != ConnectionType::SSL_ENCRIPTED) {
if(ts::config::query::sslMode == 1 && this->connection->connection_type() != server::query::ConnectionType::SSL_ENCRYPTED) {
command_result error{error::failed_connection_initialisation, "Please use a SSL encryption!"};
this->notifyError(error);
error.release_details();
@ -104,7 +89,7 @@ void QueryClient::postInitialize() {
}
if(!this->whitelisted) {
threads::MutexLock lock(this->handle->loginLock);
threads::MutexLock llock(this->handle->loginLock);
if(this->handle->queryBann.count(this->getPeerIp()) > 0) {
auto ban = this->handle->queryBann[this->getPeerIp()];
Command cmd("error");
@ -120,12 +105,26 @@ void QueryClient::postInitialize() {
this->update_cached_permissions();
}
void QueryClient::writeMessage(const std::string& message) {
if(this->state == ConnectionState::DISCONNECTED || !this->handle) return;
void QueryClient::handle_connection_finalized() {
/* when this has been called there could not be any command executing! */
//TODO: Is this statement really true?
if(this->connectionType == ConnectionType::PLAIN) this->writeRawMessage(message);
else if(this->connectionType == ConnectionType::SSL_ENCRIPTED) this->ssl_handler.send(pipes::buffer_view{(void*) message.data(), message.length()});
else logCritical(LOG_GENERAL, "Invalid query connection type to write to!");
if(this->server) {
{
unique_lock channel_lock(this->server->channel_tree_lock);
this->server->unregisterClient(_this.lock(), "disconnected", channel_lock);
}
this->server->groups->disableCache(this->getClientDatabaseId());
this->server = nullptr;
}
if(this->handle)
this->handle->unregisterConnection(dynamic_pointer_cast<QueryClient>(_this.lock()));
}
void QueryClient::writeMessage(const std::string& message) {
if(this->state == ClientState::DISCONNECTED) return;
this->connection->send_data(message);
}
@ -142,331 +141,25 @@ bool QueryClient::close_connection(const std::chrono::system_clock::time_point&
auto ownLock = dynamic_pointer_cast<QueryClient>(_this.lock());
if(!ownLock) return false;
unique_lock<std::recursive_mutex> handleLock(this->lock_packet_handle);
unique_lock<threads::Mutex> lock(this->closeLock);
bool flushing = flushTimeout.time_since_epoch().count() != 0;
if(this->state == ConnectionState::DISCONNECTED || (flushing && this->state == ConnectionState::DISCONNECTING)) return false;
this->state = flushing ? ConnectionState::DISCONNECTING : ConnectionState::DISCONNECTED;
if(this->readEvent) { //Attention dont trigger this within the read thread!
event_del_block(this->readEvent);
event_free(this->readEvent);
this->readEvent = nullptr;
}
if(this->server){
{
unique_lock channel_lock(this->server->channel_tree_lock);
this->server->unregisterClient(_this.lock(), "disconnected", channel_lock);
}
this->server->groups->disableCache(this->getClientDatabaseId());
this->server = nullptr;
}
if(flushing){
this->flushThread = new threads::Thread(THREAD_SAVE_OPERATIONS | THREAD_EXECUTE_LATER, [ownLock, flushTimeout](){
while(ownLock->state == ConnectionState::DISCONNECTING && flushTimeout > system_clock::now()){
{
std::lock_guard buffer_lock(ownLock->buffer_lock);
if(ownLock->readQueue.empty() && ownLock->writeQueue.empty()) break;
}
usleep(10 * 1000);
}
if(ownLock->state == ConnectionState::DISCONNECTING) ownLock->disconnectFinal();
});
flushThread->name("Flush thread QC").execute();
} else {
threads::MutexLock l1(this->flushThreadLock);
handleLock.unlock();
lock.unlock();
if(this->flushThread){
threads::NegatedMutexLock l(this->closeLock);
this->flushThread->join();
}
disconnectFinal();
}
this->connection->close_connection(flushTimeout);
return true;
}
void QueryClient::disconnectFinal() {
lock_guard<recursive_mutex> lock_tick(this->lock_query_tick);
lock_guard<recursive_mutex> lock_handle(this->lock_packet_handle);
threads::MutexLock lock_close(this->closeLock);
std::unique_lock buffer_lock(this->buffer_lock, try_to_lock);
bool QueryClient::process_next_command() {
using CommandAssembleState = server::query::CommandAssembleState;
if(final_disconnected) {
logError(LOG_QUERY, "Tried to disconnect a client twice!");
return;
}
final_disconnected = true;
this->state = ConnectionState::DISCONNECTED;
{
threads::MutexTryLock l(this->flushThreadLock);
if(!!l) {
if(this->flushThread) {
this->flushThread->detach();
delete this->flushThread; //Release the captured this lock
this->flushThread = nullptr;
}
}
}
lock_guard clock(this->command_lock);
if(!this->handle || this->state == ClientState::DISCONNECTED) return false;
if(this->writeEvent) {
event_del_block(this->writeEvent);
event_free(this->writeEvent);
this->writeEvent = nullptr;
}
if(this->readEvent) {
event_del_block(this->readEvent);
event_free(this->readEvent);
this->readEvent = nullptr;
}
if(this->clientFd > 0) {
if(shutdown(this->clientFd, SHUT_RDWR) < 0)
debugMessage(LOG_QUERY, "Could not shutdown query client socket! {} ({})", errno, strerror(errno));
if(close(this->clientFd) < 0)
debugMessage(LOG_QUERY, "Failed to close the query client socket! {} ({})", errno, strerror(errno));
this->clientFd = -1;
}
if(this->server) {
{
unique_lock channel_lock(this->server->channel_tree_lock);
this->server->unregisterClient(_this.lock(), "disconnected", channel_lock);
}
this->server->groups->disableCache(this->getClientDatabaseId());
this->server = nullptr;
}
this->readQueue.clear();
this->writeQueue.clear();
if(this->handle)
this->handle->unregisterConnection(dynamic_pointer_cast<QueryClient>(_this.lock()));
}
void QueryClient::writeRawMessage(const std::string &message) {
{
std::lock_guard lock(this->buffer_lock);
this->writeQueue.push_back(message);
}
if(this->writeEvent) event_add(this->writeEvent, nullptr);
}
void QueryClient::handleMessageWrite(int fd, short, void *) {
auto ownLock = _this.lock();
std::unique_lock buffer_lock(this->buffer_lock, try_to_lock);
if(this->state == ConnectionState::DISCONNECTED) return;
if(!buffer_lock.owns_lock()) {
if(this->writeEvent)
event_add(this->writeEvent, nullptr);
return;
}
int writes = 0;
string buffer;
while(writes < 10 && !this->writeQueue.empty()) {
if(buffer.empty()) {
buffer = std::move(this->writeQueue.front());
this->writeQueue.pop_front();
}
auto length = send(fd, buffer.data(), buffer.length(), MSG_NOSIGNAL);
#ifdef DEBUG_TRAFFIC
debugMessage("Write " + to_string(buffer.length()));
hexDump((void *) buffer.data(), buffer.length());
#endif
if(length == -1) {
if (errno == EINTR || errno == EAGAIN) {
if(this->writeEvent)
event_add(this->writeEvent, nullptr);
return;
}
else {
logError(LOG_QUERY, "{} Failed to write message: {} ({} => {})", CLIENT_STR_LOG_PREFIX, length, errno, strerror(errno));
threads::Thread([=](){ ownLock->close_connection(chrono::system_clock::now() + chrono::seconds{5}); }).detach();
return;
}
} else {
if(buffer.length() == length)
buffer = "";
else
buffer = buffer.substr(length);
}
writes++;
}
if(!buffer.empty())
this->writeQueue.push_front(buffer);
if(!this->writeQueue.empty() && this->writeEvent)
event_add(this->writeEvent, nullptr);
}
void QueryClient::handleMessageRead(int fd, short, void *) {
auto ownLock = dynamic_pointer_cast<QueryClient>(_this.lock());
if(!ownLock) {
logCritical(LOG_QUERY, "Could not get own lock!");
return;
}
string buffer(1024, 0);
auto length = read(fd, (void*) buffer.data(), buffer.length());
if(length <= 0){
if(errno == EINTR || errno == EAGAIN)
;//event_add(this->readEvent, nullptr);
else if(length == 0 && errno == 0) {
logMessage(LOG_QUERY, "{} Connection closed. Client disconnected.", CLIENT_STR_LOG_PREFIX);
event_del_noblock(this->readEvent);
std::thread([ownLock]{
ownLock->close_connection();
}).detach();
} else {
logError(LOG_QUERY, "{} Failed to read! Code: {} errno: {} message: {}", CLIENT_STR_LOG_PREFIX, length, errno, strerror(errno));
event_del_noblock(this->readEvent);
threads::Thread(THREAD_SAVE_OPERATIONS, [ownLock](){ ownLock->close_connection(); }).detach();
}
return;
}
buffer.resize(length);
{
std::lock_guard buffer_lock(this->buffer_lock);
if(this->state == ConnectionState::DISCONNECTED)
return;
this->readQueue.push_back(std::move(buffer));
#ifdef DEBUG_TRAFFIC
debugMessage("Read " + to_string(buffer.length()));
hexDump((void *) buffer.data(), buffer.length());
#endif
}
if(this->handle)
this->handle->executePool()->execute([ownLock]() {
int counter = 0;
while(ownLock->tickIOMessageProgress() && counter++ < 15);
});
}
bool QueryClient::tickIOMessageProgress() {
lock_guard<recursive_mutex> lock(this->lock_packet_handle);
if(!this->handle || this->state == ConnectionState::DISCONNECTED || this->state == ConnectionState::DISCONNECTING) return false;
string message;
bool next = false;
{
std::lock_guard buffer_lock(this->buffer_lock);
if(this->readQueue.empty()) return false;
message = std::move(this->readQueue.front());
this->readQueue.pop_front();
next |= this->readQueue.empty();
}
if(this->connectionType == ConnectionType::PLAIN) {
int count = 0;
while(this->handleMessage(pipes::buffer_view{(void*) message.data(), message.length()}) && count++ < 15) message = "";
next |= count == 15;
} else if(this->connectionType == ConnectionType::SSL_ENCRIPTED) {
this->ssl_handler.process_incoming_data(pipes::buffer_view{(void*) message.data(), message.length()});
} else if(this->connectionType == ConnectionType::UNKNOWN) {
if(config::query::sslMode != 0 && pipes::SSL::isSSLHeader(message)) {
this->initializeSSL();
/*
* - Content
* \x16
* -Version (1)
* \x03 \x00
* - length (2)
* \x00 \x04
*
* - Header
* \x00 -> hello request (3)
* \x05 -> length (4)
*/
//this->writeRawMessage(string("\x16\x03\x01\x00\x05\x00\x00\x00\x00\x00", 10));
} else {
this->connectionType = ConnectionType::PLAIN;
this->postInitialize();
}
next = true;
{
std::lock_guard buffer_lock(this->buffer_lock);
this->readQueue.push_front(std::move(message));
}
}
return next;
}
extern InstanceHandler* serverInstance;
void QueryClient::initializeSSL() {
this->connectionType = ConnectionType::SSL_ENCRIPTED;
this->ssl_handler.direct_process(pipes::PROCESS_DIRECTION_OUT, true);
this->ssl_handler.direct_process(pipes::PROCESS_DIRECTION_IN, true);
this->ssl_handler.callback_data(std::bind(&QueryClient::handleMessage, this, placeholders::_1));
this->ssl_handler.callback_write(std::bind(&QueryClient::writeRawMessage, this, placeholders::_1));
this->ssl_handler.callback_initialized = std::bind(&QueryClient::postInitialize, this);
this->ssl_handler.callback_error([&](int code, const std::string& message) {
if(code == PERROR_SSL_ACCEPT) {
this->disconnect("invalid accept");
} else if(code == PERROR_SSL_TIMEOUT)
this->disconnect("invalid accept (timeout)");
else
logError(LOG_QUERY, "Got unknown ssl error ({} | {})", code, message);
});
{
auto context = serverInstance->sslManager()->getQueryContext();
auto options = make_shared<pipes::SSL::Options>();
options->type = pipes::SSL::SERVER;
options->context_method = TLS_method();
options->default_keypair({context->privateKey, context->certificate});
if(!this->ssl_handler.initialize(options)) {
logError(LOG_QUERY, "[{}] Failed to setup ssl!", CLIENT_STR_LOG_PREFIX);
}
}
}
bool QueryClient::handleMessage(const pipes::buffer_view& message) {
{
threads::MutexLock l(this->closeLock);
if(this->state == ConnectionState::DISCONNECTED)
return false;
}
#ifdef DEBUG_TRAFFIC
debugMessage("Handling message " + to_string(message.length()));
hexDump((void *) message.data(), message.length());
#endif
string command;
{
this->lineBuffer += message.string();
int length = 2;
auto pos = this->lineBuffer.find("\r\n");
if(pos == string::npos) pos = this->lineBuffer.find("\n\r");
if(pos == string::npos) {
length = 1;
pos = this->lineBuffer.find('\n');
}
if(pos != string::npos){
command = this->lineBuffer.substr(0, pos);
if(this->lineBuffer.size() > pos + length)
this->lineBuffer = this->lineBuffer.substr(pos + length);
else
this->lineBuffer.clear();
}
if(pos == string::npos) return false;
string command{};
bool more_pending{false};
switch (this->connection->next_command(command)) {
case CommandAssembleState::MORE_COMMANDS_PENDING:
more_pending = true;
case CommandAssembleState::SUCCESS:
break;
case CommandAssembleState::NO_COMMAND_PENDING:
return false; /* nothing to do */
}
if(command.empty() || command.find_first_not_of(' ') == string::npos) { //Empty command
@ -515,7 +208,7 @@ bool QueryClient::handleMessage(const pipes::buffer_view& message) {
error = command_result{error::vs_critical, std::string{ex.what()}};
goto handle_error;
}
return true;
return more_pending;
handle_error:
this->notifyError(error);
@ -539,15 +232,13 @@ void QueryClient::tick(const std::chrono::system_clock::time_point &time) {
}
void QueryClient::queryTick() {
lock_guard<recursive_mutex> lock_tick(this->lock_query_tick);
if(this->idleTimestamp.time_since_epoch().count() > 0 && system_clock::now() - this->idleTimestamp > minutes(5)){
debugMessage(LOG_QUERY, "Dropping client " + this->getLoggingPeerIp() + "|" + this->getDisplayName() + ". (Timeout)");
this->close_connection(system_clock::now() + seconds(1));
}
if(this->connectionType == ConnectionType::UNKNOWN && system_clock::now() - milliseconds(500) > connectedTimestamp) {
this->connectionType = ConnectionType::PLAIN;
this->postInitialize();
if(this->connection->connection_type() == server::query::ConnectionType::UNKNOWN && system_clock::now() - milliseconds{500} > connectedTimestamp) {
this->connection->enforce_text_connection();
}
}

View File

@ -10,19 +10,18 @@ namespace ts::server {
class QueryServer;
class QueryAccount;
namespace server::query {
class QueryClientConnection;
}
class QueryClient : public ConnectedClient {
friend class QueryServer;
enum ConnectionType {
PLAIN,
SSL_ENCRIPTED,
UNKNOWN
};
friend class server::query::QueryClientConnection;
public:
QueryClient(QueryServer*, int sockfd);
~QueryClient() override;
[[nodiscard]] inline QueryServer* getQueryServer() { return this->handle; }
void writeMessage(const std::string&);
void sendCommand(const ts::Command &command, bool low = false) override;
@ -30,7 +29,6 @@ namespace ts::server {
bool disconnect(const std::string &reason) override;
bool close_connection(const std::chrono::system_clock::time_point& timeout = std::chrono::system_clock::time_point()) override;
void disconnectFinal();
bool eventActive(QueryEventGroup, QueryEventSpecifier);
void toggleEvent(QueryEventGroup, QueryEventSpecifier, bool);
@ -41,51 +39,26 @@ namespace ts::server {
inline std::shared_ptr<QueryAccount> getQueryAccount() { return this->query_account; }
protected:
void preInitialize();
void postInitialize();
/* Will be called as soon the connection has been initialized. This means directly within the initialize call or within the IO read callback. */
void handle_connection_initialized();
void handle_connection_finalized();
void tick(const std::chrono::system_clock::time_point &time) override;
void queryTick();
protected:
void initializeSSL();
/* returns true if more commands are pending */
bool process_next_command();
bool handleMessage(const pipes::buffer_view&);
bool tickIOMessageProgress();
void handleMessageRead(int, short, void*);
void handleMessageWrite(int, short, void*);
void writeRawMessage(const std::string&);
void applySelfLock(const std::shared_ptr<QueryClient> &cl);
[[nodiscard]] bool initialize(std::string& /* error */, const std::shared_ptr<QueryClient> &cl);
private:
QueryServer* handle;
ConnectionType connectionType = ConnectionType::UNKNOWN;
server::query::QueryClientConnection* connection{nullptr};
bool whitelisted = false;
int clientFd = -1;
::event* readEvent = nullptr;
::event* writeEvent = nullptr;
threads::Mutex closeLock;
pipes::SSL ssl_handler;
std::mutex buffer_lock;
std::deque<std::string> writeQueue;
std::deque<std::string> readQueue;
threads::Mutex flushThreadLock;
threads::Thread* flushThread = nullptr;
bool final_disconnected = false;
std::string lineBuffer;
std::chrono::time_point<std::chrono::system_clock> connectedTimestamp;
uint16_t eventMask[QueryEventGroup::QEVENTGROUP_MAX];
std::recursive_mutex lock_packet_handle;
std::recursive_mutex lock_query_tick;
std::shared_ptr<QueryAccount> query_account;
protected:
command_result handleCommand(Command &command) override;

View File

@ -0,0 +1,384 @@
//
// Created by WolverinDEV on 11/03/2020.
//
#include "./QueryClientConnection.h"
#include <netinet/tcp.h>
#include <log/LogUtils.h>
#include <pipes/errors.h>
#include <src/InstanceHandler.h>
#include "./QueryClient.h"
#include "../ConnectedClient.h"
#include "../../server/QueryServer.h"
#include "QueryClientConnection.h"
using namespace ts::server::server::query;
#if defined(TCP_CORK) && !defined(TCP_NOPUSH)
#define TCP_NOPUSH TCP_CORK
#endif
namespace ts::server::server::query {
/* will be set by the event loop */
thread_local bool thread_is_event_loop{false};
}
QueryClientConnection::QueryClientConnection(ts::server::QueryClient *client, int fd) : client_handle{client}, file_descriptor_{fd} {
TAILQ_INIT(&this->write_queue);
}
QueryClientConnection::~QueryClientConnection() {
this->finalize(true);
}
bool QueryClientConnection::initialize(std::string &error) {
assert(this->client_handle);
int enabled{1};
int disabled{0};
setsockopt(this->file_descriptor_, SOL_SOCKET, SO_KEEPALIVE, &enabled, sizeof(enabled));
if(setsockopt(this->file_descriptor_, IPPROTO_TCP, TCP_NOPUSH, &disabled, sizeof disabled) < 0)
logError(LOG_QUERY, "Could not disable nopush for {} ({}/{})", CLIENT_STR_LOG_PREFIX_(this->client_handle), errno, strerror(errno));
if(setsockopt(this->file_descriptor_, IPPROTO_TCP, TCP_NODELAY, &enabled, sizeof enabled) < 0)
logError(LOG_QUERY, "[Query] Could not disable no delay for {} ({}/{})", CLIENT_STR_LOG_PREFIX_(this->client_handle), errno, strerror(errno));
auto query_server = this->client_handle->getQueryServer();
this->readEvent = event_new(query_server->io_event_loop(), this->file_descriptor_, EV_READ | EV_PERSIST, [](int a1, short a2, void* _this) {
reinterpret_cast<QueryClientConnection*>(_this)->handle_event_read(a1, a2);
}, this);
this->writeEvent = event_new(query_server->io_event_loop(), this->file_descriptor_, EV_WRITE, [](int a1, short a2, void* _this){
reinterpret_cast<QueryClientConnection*>(_this)->handle_event_write(a1, a2);
}, this);
this->connection_state = ConnectionState::INITIALIZING;
if(ts::config::query::sslMode == 0) {
this->connection_state = ConnectionState::CONNECTED;
this->connection_type_ = ConnectionType::PLAIN_TEXT;
this->client_handle->handle_connection_initialized();
}
return true;
}
void QueryClientConnection::add_read_event() {
std::lock_guard elock{this->event_mutex};
if(this->readEvent) event_add(this->readEvent, nullptr);
}
void QueryClientConnection::finalize(bool is_destructor_call) {
auto old_state = this->connection_state;
this->connection_state = ConnectionState::DISCONNECTED;
/* unregister event handling */
{
std::unique_lock elock{this->event_mutex};
auto wevent = std::exchange(this->writeEvent, nullptr);
auto revent = std::exchange(this->readEvent, nullptr);
elock.unlock();
if(revent) {
if(thread_is_event_loop)
event_del_noblock(revent);
else
event_del_block(revent); /* may calls finalize() while we're waiting. But thats okey. */
event_free(revent);
}
if(wevent) {
if(thread_is_event_loop)
event_del_noblock(wevent);
else
event_del_block(wevent); /* may calls finalize() while we're waiting. But thats okey. */
event_free(wevent);
}
}
{
std::lock_guard block{this->buffer_lock};
/* Free the entire tail queue. */
while (auto buffer = TAILQ_FIRST(&this->write_queue)) {
TAILQ_REMOVE(&this->write_queue, buffer, tq);
free(buffer->original_ptr);
delete buffer;
}
TAILQ_INIT(&this->write_queue); /* just ensures a valid tailq */
::free(this->read_buffer.buffer);
this->read_buffer.buffer = nullptr;
this->read_buffer.length = 0;
this->read_buffer.fill_count = 0;
}
if(!is_destructor_call && old_state != ConnectionState::DISCONNECTED)
this->client_handle->handle_connection_finalized();
}
void QueryClientConnection::handle_event_read(int fd, short events) {
constexpr auto buffer_length{1024 * 4};
uint8_t buffer[buffer_length];
auto length = read(fd, (void *) buffer, buffer_length);
if (length <= 0) {
if (errno == EINTR || errno == EAGAIN)
return;
else if (length == 0) {
logMessage(LOG_QUERY, "{} Connection closed (r). Client disconnected.",
CLIENT_STR_LOG_PREFIX_(this->client_handle));
} else {
logError(LOG_QUERY, "{} Failed to read! Code: {} errno: {} message: {}",
CLIENT_STR_LOG_PREFIX_(this->client_handle), length, errno, strerror(errno));
}
event_del_noblock(this->readEvent);
this->close_connection(std::chrono::system_clock::time_point{});
return;
}
if (this->connection_type_ == ConnectionType::PLAIN_TEXT) {
plain_text_buffer_insert:
this->handle_decoded_message(buffer, length);
} else if (this->connection_type_ == ConnectionType::SSL_ENCRYPTED) {
ssl_buffer_insert:;
this->ssl_handler.process_incoming_data(pipes::buffer_view{(const char*) buffer, (size_t) length});;
} else {
if (config::query::sslMode != 0 && pipes::SSL::isSSLHeader(std::string{(const char *) buffer, (size_t) length})) {
if(!this->initialize_ssl()) return;
/*
* - Content
* \x16
* -Version (1)
* \x03 \x00
* - length (2)
* \x00 \x04
*
* - Header
* \x00 -> hello request (3)
* \x05 -> length (4)
*/
//this->writeRawMessage(string("\x16\x03\x01\x00\x05\x00\x00\x00\x00\x00", 10));
goto ssl_buffer_insert;
} else {
this->connection_type_ = ConnectionType::PLAIN_TEXT;
this->client_handle->handle_connection_initialized();
goto plain_text_buffer_insert;
}
}
}
void QueryClientConnection::handle_event_write(int fd, short events) {
bool readd_write{false};
if(events & EV_WRITE) {
/* Safe to access, because we're only reading the queue and the head could never change. Only within the IO loop itself. */
WriteBuffer* wbuffer;
while((wbuffer = TAILQ_FIRST(&this->write_queue))) {
auto written = send(fd, wbuffer->ptr, wbuffer->length, 0);
if(written <= 0) {
if(errno == EAGAIN) {
readd_write = true;
break;
}
if(written == 0) {
logMessage(LOG_QUERY, "{} Connection closed (w). Client disconnected.", CLIENT_STR_LOG_PREFIX_(this->client_handle));
} else {
logError(LOG_QUERY, "{} Failed to write! Code: {} errno: {} message: {}", CLIENT_STR_LOG_PREFIX_(this->client_handle), written, errno, strerror(errno));
}
event_del_noblock(this->readEvent);
this->close_connection(std::chrono::system_clock::time_point{});
return;
}
wbuffer->length -= written;
if(wbuffer->length == 0) {
std::lock_guard block{this->buffer_lock};
TAILQ_REMOVE(&this->write_queue, wbuffer, tq);
::free(wbuffer->original_ptr);
delete wbuffer;
} else {
wbuffer->ptr += written;
}
}
}
if(this->connection_state == ConnectionState::DISCONNECTING) {
if(!readd_write || (events & EV_TIMEOUT)) {
/* disconnect timeouted or nothing more to write */
this->finalize(false);
return;
} else /* if(readd_write) */ { /* check not needed because tested before already */
auto time_left = this->disconnect_timeout - std::chrono::system_clock::now();
timeval timeout{0, 1};
if(time_left.count() > 0) {
timeout.tv_sec = std::chrono::floor<std::chrono::seconds>(time_left).count();
timeout.tv_usec = std::chrono::floor<std::chrono::microseconds>(time_left).count() % 1000000ULL;
}
event_add(this->writeEvent, &timeout);
}
} else if(readd_write) {
event_add(this->writeEvent, nullptr);
}
}
bool QueryClientConnection::initialize_ssl() {
this->connection_type_ = ConnectionType::SSL_ENCRYPTED;
this->ssl_handler.direct_process(pipes::PROCESS_DIRECTION_OUT, true);
this->ssl_handler.direct_process(pipes::PROCESS_DIRECTION_IN, true);
this->ssl_handler.callback_data([&](const pipes::buffer_view &buffer) {
this->handle_decoded_message(buffer.data_ptr<void>(), buffer.length());
});
this->ssl_handler.callback_write([&](const pipes::buffer_view &buffer) {
this->send_data_raw({buffer.data_ptr<char>(), buffer.length()});
});
this->ssl_handler.callback_initialized = [&] {
this->client_handle->handle_connection_initialized();
};
this->ssl_handler.callback_error([&](int code, const std::string& message) {
if(code == PERROR_SSL_ACCEPT) {
logError(LOG_QUERY, "{} Failed to initialize query ssl session ({})", CLIENT_STR_LOG_PREFIX_(this->client_handle), message);
this->close_connection(std::chrono::system_clock::time_point{});
} else if(code == PERROR_SSL_TIMEOUT) {
logError(LOG_QUERY, "{} Failed to initialize query ssl session (timeout: {})", CLIENT_STR_LOG_PREFIX_(this->client_handle), message);
this->close_connection(std::chrono::system_clock::time_point{});
} else
logError(LOG_QUERY, "{} Received SSL error ({} | {})", CLIENT_STR_LOG_PREFIX_(this->client_handle), code, message);
});
{
auto context = serverInstance->sslManager()->getQueryContext();
auto options = std::make_shared<pipes::SSL::Options>();
options->type = pipes::SSL::SERVER;
options->context_method = TLS_method();
options->default_keypair({context->privateKey, context->certificate});
if(!this->ssl_handler.initialize(options)) {
logError(LOG_QUERY, "[{}] Failed to setup ssl!", CLIENT_STR_LOG_PREFIX_(this->client_handle));
this->close_connection(std::chrono::system_clock::time_point{});
return false;
}
}
return true;
}
void QueryClientConnection::handle_decoded_message(const void *buffer, size_t size) {
{
std::lock_guard block{this->buffer_lock};
if((this->read_buffer.length - this->read_buffer.fill_count) < size) { /* !this->read_buffer.buffer is already implicitly implemented because by default read_buffer.length will be zero */
const auto new_size{this->read_buffer.length + size + 128};
auto new_buffer = ::malloc(new_size);
assert(new_buffer);
if(this->read_buffer.fill_count) memcpy(new_buffer, this->read_buffer.buffer, this->read_buffer.fill_count);
::free(this->read_buffer.buffer);
this->read_buffer.buffer = new_buffer;
this->read_buffer.length = new_size;
}
assert(this->read_buffer.buffer);
assert(this->read_buffer.length - this->read_buffer.fill_count >= size);
memcpy((char*) this->read_buffer.buffer + this->read_buffer.fill_count, buffer, size);
this->read_buffer.fill_count += size;
}
{
//TODO: Improve this command progress
auto qserver{this->client_handle->handle};
if(qserver) {
auto wlock{this->client_handle->_this};
qserver->executePool()->execute([wlock]() {
auto client{std::dynamic_pointer_cast<QueryClient>(wlock.lock())};
if(!client) return;
int counter = 0;
while(client->process_next_command() && counter++ < 15);
});
}
}
}
void QueryClientConnection::send_data(const std::string_view &buffer) {
if(this->connection_type_ == ConnectionType::PLAIN_TEXT)
this->send_data_raw(buffer);
else if(this->connection_type_ == ConnectionType::SSL_ENCRYPTED)
this->ssl_handler.send(pipes::buffer_view{buffer.data(), buffer.length()});
}
void QueryClientConnection::send_data_raw(const std::string_view &buffer) {
auto wbuf = new WriteBuffer{};
wbuf->original_ptr = (char*) malloc(buffer.length());
wbuf->ptr = wbuf->original_ptr;
memcpy(wbuf->ptr, buffer.data(), buffer.length());
wbuf->length = buffer.length();
{
std::lock_guard wlock{this->buffer_lock};
TAILQ_INSERT_TAIL(&this->write_queue, wbuf, tq);
}
{
std::lock_guard elock{this->event_mutex};
if(this->writeEvent)
event_add(this->writeEvent, nullptr);
}
}
void QueryClientConnection::close_connection(const std::chrono::system_clock::time_point &timeout) {
if(timeout.time_since_epoch().count() > 0) {
this->connection_state = ConnectionState::DISCONNECTING;
this->disconnect_timeout = timeout;
std::lock_guard elock{this->event_mutex};
if(this->writeEvent) {
event_add(this->writeEvent, nullptr);
return;
}
/* failed to add the write event, so call disconnect */
}
if(this->connection_state == ConnectionState::DISCONNECTED) return;
this->finalize(false);
}
void QueryClientConnection::enforce_text_connection() {
if(this->connection_state != ConnectionState::INITIALIZING) return;
this->connection_state = ConnectionState::CONNECTED;
this->connection_type_ = ConnectionType::PLAIN_TEXT;
this->client_handle->handle_connection_initialized();
}
CommandAssembleState QueryClientConnection::next_command(std::string &result) {
std::lock_guard block{this->buffer_lock};
auto new_line_idx = (char*) memchr(this->read_buffer.buffer, '\n', this->read_buffer.fill_count);
if(!new_line_idx) return CommandAssembleState::NO_COMMAND_PENDING;
const auto length = ((char*) this->read_buffer.buffer - new_line_idx) * sizeof(*new_line_idx);
auto line_length{length};
if(length > 0 && *(new_line_idx - 1) == '\r')
line_length--;
result.assign((char*) this->read_buffer.buffer, line_length);
//Do not copy the \r character
auto copy_bytes{this->read_buffer.fill_count - length};
if(copy_bytes > 0 && *(new_line_idx + 1) == '\r') {
copy_bytes--;
new_line_idx++;
}
memcpy(this->read_buffer.buffer, new_line_idx + 1, copy_bytes);
this->read_buffer.fill_count = copy_bytes;
return copy_bytes == 0 ? CommandAssembleState::SUCCESS : CommandAssembleState::MORE_COMMANDS_PENDING;
}

View File

@ -0,0 +1,98 @@
#pragma once
#include <chrono>
#include <event.h>
#include <deque>
#include <string>
#include <pipes/ssl.h>
#include <sys/queue.h>
namespace ts::server {
class QueryClient;
}
namespace ts::server::server::query {
enum struct ConnectionType {
UNKNOWN,
PLAIN_TEXT,
SSL_ENCRYPTED,
/* SSH */
};
enum struct ConnectionState {
INITIALIZING,
CONNECTED,
DISCONNECTING,
DISCONNECTED
};
enum struct CommandAssembleState {
SUCCESS,
MORE_COMMANDS_PENDING,
NO_COMMAND_PENDING
};
class QueryClientConnection {
public:
explicit QueryClientConnection(QueryClient* /* client */, int /* file descriptor */);
~QueryClientConnection();
[[nodiscard]] inline ConnectionType connection_type() const { return this->connection_type_; }
bool initialize(std::string& /* error */);
void add_read_event();
void finalize(bool /* is destructor call */);
void send_data(const std::string_view& /* payload */);
void send_data_raw(const std::string_view& /* payload */);
void enforce_text_connection();
[[nodiscard]] CommandAssembleState next_command(std::string& /* command */);
/* could be called from every thread (event IO thread) */
void close_connection(const std::chrono::system_clock::time_point& /* disconnect timeout */);
private:
struct WriteBuffer {
char* original_ptr;
char* ptr;
size_t length;
TAILQ_ENTRY(WriteBuffer) tq;
};
QueryClient* client_handle{nullptr};
ConnectionState connection_state{ConnectionState::INITIALIZING};
std::chrono::system_clock::time_point disconnect_timeout{};
ConnectionType connection_type_{ConnectionType::UNKNOWN};
int file_descriptor_{-1};
/* only delete the events within the event loop! */
std::mutex event_mutex{};
::event* readEvent{nullptr};
::event* writeEvent{nullptr};
pipes::SSL ssl_handler{};
std::mutex buffer_lock{};
struct {
void* buffer{nullptr};
size_t length{0};
size_t fill_count{0};
std::chrono::system_clock::time_point last_shrink{};
} read_buffer;
TAILQ_HEAD(, WriteBuffer) write_queue{};
void handle_event_write(int, short);
void handle_event_read(int, short);
bool initialize_ssl();
void handle_decoded_message(const void* /* message */, size_t /* length */);
};
}

View File

@ -7,11 +7,13 @@
#include <protocol/buffers.h>
#include <protocol/AcknowledgeManager.h>
#include <protocol/CompressionHandler.h>
#include <protocol/CryptHandler.cpp>
#include <protocol/CryptHandler.h>
#include "../../ConnectionStatistics.h"
using namespace ts;
using namespace ts::protocol;
using namespace ts::connection;
using namespace ts::server::server::udp;
PacketDecoder::PacketDecoder(ts::connection::CryptHandler *crypt_handler)
@ -69,7 +71,7 @@ PacketDecodeResult PacketDecoder::decode_incoming_data_(std::string& error, bool
/* handle the order stuff */
auto& fragment_buffer = this->_command_fragment_buffers[PacketDecoder::command_fragment_buffer_index(packet_parser.type())];
unique_lock queue_lock(fragment_buffer.buffer_lock);
std::unique_lock queue_lock(fragment_buffer.buffer_lock);
auto result = fragment_buffer.accept_index(packet_parser.packet_id());
if(result != 0) { /* packet index is ahead buffer index */
error = "pid: " + std::to_string(packet_parser.packet_id()) + ",";
@ -78,7 +80,7 @@ PacketDecodeResult PacketDecoder::decode_incoming_data_(std::string& error, bool
if(result == -1) { /* underflow */
/* we've already got the packet, but the client dosn't know that so we've to send the acknowledge again */
this->callback_send_acknowledge(packet_parser.packet_id(), packet_parser.type() == protocol::COMMAND_LOW);
this->callback_send_acknowledge(this->callback_argument, packet_parser.packet_id(), packet_parser.type() == protocol::COMMAND_LOW);
return PacketDecodeResult::DUPLICATED_PACKET;
}
@ -145,7 +147,7 @@ PacketDecodeResult PacketDecoder::decode_incoming_data_(std::string& error, bool
};
{
unique_lock queue_lock(fragment_buffer.buffer_lock);
std::unique_lock queue_lock(fragment_buffer.buffer_lock);
if(!fragment_buffer.insert_index(packet_parser.packet_id(), std::move(fragment_entry)))
return PacketDecodeResult::COMMAND_INSTERT_FAILED;
@ -156,6 +158,8 @@ PacketDecodeResult PacketDecoder::decode_incoming_data_(std::string& error, bool
} else {
this->callback_decoded_packet(this->callback_argument, packet_parser);
}
return PacketDecodeResult::SUCCESS;
}
bool PacketDecoder::verify_encryption(const pipes::buffer_view &buffer) {
@ -169,14 +173,14 @@ bool PacketDecoder::verify_encryption(const pipes::buffer_view &buffer) {
CommandReassembleResult PacketDecoder::reassemble_command(pipes::buffer &result, bool &is_command_low) {
bool more_commands_pending{false};
command_fragment_buffer_t* buffer{nullptr};
unique_lock<std::recursive_timed_mutex> buffer_lock; /* general buffer lock */
std::unique_lock<std::recursive_timed_mutex> buffer_lock; /* general buffer lock */
{
//FIXME: Currently command low packets cant be handled if there is a command packet stuck in reassemble queue
/* handle commands before command low packets */
for(auto& buf : this->_command_fragment_buffers) {
unique_lock ring_lock(buf.buffer_lock, try_to_lock);
std::unique_lock ring_lock(buf.buffer_lock, std::try_to_lock);
if(!ring_lock.owns_lock()) continue;
if(buf.front_set()) {
@ -284,13 +288,13 @@ void PacketDecoder::force_insert_command(const pipes::buffer_view &buffer) {
{
auto& fragment_buffer = this->_command_fragment_buffers[command_fragment_buffer_index(protocol::COMMAND)];
unique_lock queue_lock(fragment_buffer.buffer_lock);
std::unique_lock queue_lock(fragment_buffer.buffer_lock);
fragment_buffer.push_front(std::move(fragment_entry));
}
}
void PacketDecoder::register_initiv_packet() {
auto& fragment_buffer = this->_command_fragment_buffers[command_fragment_buffer_index(protocol::COMMAND)];
unique_lock buffer_lock(fragment_buffer.buffer_lock);
std::unique_lock buffer_lock(fragment_buffer.buffer_lock);
fragment_buffer.set_full_index_to(1); /* the first packet (0) is already the clientinitiv packet */
}

View File

@ -30,7 +30,7 @@ void PacketEncoder::reset() {
category.queue.clear();
}
this->id_generator.reset();
this->id_generator_.reset();
}
bool PacketEncoder::encode_packet(const std::shared_ptr<protocol::ServerPacket> &original_packet, EncodeFlags flags) {
@ -191,7 +191,7 @@ PacketEncodeResult PacketEncoder::encode_packet_(std::string& error,
/* apply packet ids */
for(const auto& fragment : fragments) {
if(!fragment->memory_state.id_branded)
fragment->applyPacketId(this->id_generator);
fragment->applyPacketId(this->id_generator_);
}
work_lock.unlock(); /* the rest could be unordered */

View File

@ -83,6 +83,8 @@ namespace ts::server::server::udp {
bool encode_packet(const std::shared_ptr<protocol::ServerPacket> &/* the packet */, EncodeFlags /* flags */);
bool do_encode();
[[nodiscard]] inline protocol::PacketIdManager& id_generator() { return this->id_generator_; }
[[nodiscard]] inline std::shared_ptr<stats::ConnectionStatistics> get_statistics() { return this->statistics_; }
inline void set_statistics(const std::shared_ptr<stats::ConnectionStatistics>& stats) { this->statistics_ = stats; }
@ -111,7 +113,7 @@ namespace ts::server::server::udp {
/* ---------- Processing ---------- */
/* automatically locked because packets of the same kind should be lock their "work_lock" from their WritePreprocessQueue object */
protocol::PacketIdManager id_generator{};
protocol::PacketIdManager id_generator_{};
std::atomic<size_t> process_count{0};

View File

@ -4,4 +4,66 @@
#include "PingHandler.h"
using namespace ts::server::server::udp;
using namespace ts::server::server::udp;
void PingHandler::reset() {
this->last_ping_id = 0;
this->current_ping_ = std::chrono::milliseconds{0};
this->last_recovery_command_send = std::chrono::system_clock::time_point{};
this->last_command_acknowledge_ = std::chrono::system_clock::time_point{};
this->last_response_ = std::chrono::system_clock::time_point{};
this->last_request_ = std::chrono::system_clock::time_point{};
}
void PingHandler::received_pong(uint16_t ping_id) {
if(this->last_ping_id != ping_id) return;
auto now = std::chrono::system_clock::now();
this->current_ping_ = std::chrono::floor<std::chrono::milliseconds>(this->last_request_ - now);
this->last_response_ = now;
this->last_command_acknowledge_ = now; /* That's here for purpose!*/
}
void PingHandler::received_command_acknowledged() {
this->last_command_acknowledge_ = std::chrono::system_clock::now();
}
void PingHandler::tick(const std::chrono::system_clock::time_point& now) {
if(this->last_request_ + PingHandler::ping_request_interval < now)
this->send_ping_request(); /* may update last_response_ */
if(this->last_response_ + PingHandler::ping_timeout < now) {
if(this->last_recovery_command_send + PingHandler::recovery_request_interval < now)
this->send_recovery_request();
if(this->last_command_acknowledge_ + PingHandler::recovery_timeout < now) {
if(auto callback{this->callback_time_outed}; callback)
callback(this->callback_argument);
}
}
}
void PingHandler::send_ping_request() {
auto now = std::chrono::system_clock::now();
if(this->last_response_.time_since_epoch().count() == 0)
this->last_response_ = now;
this->last_request_ = now;
if(auto callback{this->callback_send_ping}; callback)
callback(this->callback_argument, this->last_ping_id);
}
void PingHandler::send_recovery_request() {
auto now = std::chrono::system_clock::now();
if(this->last_command_acknowledge_.time_since_epoch().count() == 0)
this->last_command_acknowledge_ = now;
this->last_recovery_command_send = now;
if(auto callback{this->callback_send_recovery_command}; callback)
callback(this->callback_argument);
}

View File

@ -12,10 +12,9 @@ namespace ts::server::server::udp {
void reset();
void tick();
void received_ping(uint16_t /* ping id */);
void command_packet_acknowledged();
void tick(const std::chrono::system_clock::time_point&);
void received_pong(uint16_t /* ping id */);
void received_command_acknowledged();
[[nodiscard]] inline std::chrono::milliseconds current_ping() const { return this->current_ping_; }
@ -24,12 +23,22 @@ namespace ts::server::server::udp {
callback_send_recovery_command_t callback_send_recovery_command{nullptr};
callback_time_outed_t callback_time_outed{nullptr};
private:
constexpr static std::chrono::milliseconds ping_request_interval{2500};
constexpr static std::chrono::milliseconds ping_timeout{10 * 1000};
constexpr static std::chrono::milliseconds recovery_request_interval{1000};
constexpr static std::chrono::milliseconds recovery_timeout{10 * 1000};
std::chrono::milliseconds current_ping_{0};
uint16_t last_ping_id{0};
std::chrono::milliseconds current_ping_{0};;
std::chrono::system_clock::time_point last_response_{};
std::chrono::system_clock::time_point last_request_{};
std::chrono::system_clock::time_point last_command_packet_{};
std::chrono::system_clock::time_point last_command_acknowledge_{};
std::chrono::system_clock::time_point last_recovery_command_send{};
void send_ping_request();
void send_recovery_request();
};
}

View File

@ -1,22 +1,19 @@
#include <algorithm>
#include <memory>
#include <tomcrypt.h>
#include <arpa/inet.h>
#include <ThreadPool/Timer.h>
#include <misc/endianness.h>
#include <misc/memtracker.h>
#include <log/LogUtils.h>
#include "VoiceClient.h"
#include "src/VirtualServer.h"
#include "../../server/VoiceServer.h"
using namespace std;
using namespace std::chrono;
using namespace ts::server;
using namespace ts::protocol;
VoiceClient::VoiceClient(const std::shared_ptr<VoiceServer>& server, const sockaddr_storage* address) : SpeakingClient(server->server->sql, server->server), voice_server(server) {
VoiceClient::VoiceClient(const std::shared_ptr<VirtualServer>& server, const sockaddr_storage* address) : SpeakingClient(server->sql, server) {
assert(address);
memtrack::allocated<VoiceClient>(this);
memcpy(&this->remote_address, address, sizeof(sockaddr_storage));
@ -24,27 +21,21 @@ VoiceClient::VoiceClient(const std::shared_ptr<VoiceServer>& server, const socka
debugMessage(this->server->getServerId(), " Creating VoiceClient instance at {}", (void*) this);
}
void VoiceClient::initialize() {
this->event_handle_packet = make_shared<event::ProxiedEventEntry<VoiceClient>>(dynamic_pointer_cast<VoiceClient>(this->ref()), &VoiceClient::execute_handle_packet);
VoiceClient::~VoiceClient() {
debugMessage(this->getServerId(), " Deleting VoiceClient instance at {}", (void*) this);
this->state = ClientState::DISCONNECTED;
memtrack::freed<VoiceClient>(this);
}
void VoiceClient::initialize(const std::shared_ptr<connection::VoiceClientConnection> &connection) {
assert(connection);
this->connection_ = connection;
this->properties()[property::CLIENT_TYPE] = ClientType::CLIENT_TEAMSPEAK;
this->properties()[property::CLIENT_TYPE_EXACT] = ClientType::CLIENT_TEAMSPEAK;
this->state = ConnectionState::INIT_HIGH;
this->connection = new connection::VoiceClientConnection(this);
}
VoiceClient::~VoiceClient() {
debugMessage(this->getServerId(), " Deleting VoiceClient instance at {}", (void*) this);
this->state = ConnectionState::DISCONNECTED;
delete this->connection;
this->connection = nullptr;
if(this->flushing_thread)
logCritical(this->getServerId(), "Deleting a VoiceClient which should still be hold within the flush thread!");
memtrack::freed<VoiceClient>(this);
this->state = ClientState::INITIALIZING;
}
void VoiceClient::sendCommand0(const std::string_view& cmd, bool low, bool direct, std::unique_ptr<threads::Future<bool>> listener) {
@ -61,60 +52,26 @@ void VoiceClient::sendCommand0(const std::string_view& cmd, bool low, bool direc
packet->enable_flag(protocol::PacketFlag::NewProtocol);
}
packet->setListener(std::move(listener));
this->connection->send_packet(packet, false, direct);
if(auto connection{this->connection_}; connection)
connection->send_packet(packet, false, direct);
#ifdef PKT_LOG_CMD
logTrace(this->getServerId(), "{}[Command][Server -> Client] Sending command {}. Command low: {}. Full command: {}", CLIENT_STR_LOG_PREFIX, cmd.substr(0, cmd.find(' ')), low, cmd);
#endif
}
void VoiceClient::sendAcknowledge(uint16_t packetId, bool low) {
char buffer[2];
le2be16(packetId, buffer);
auto packet = make_shared<protocol::ServerPacket>(low ? protocol::PacketTypeInfo::AckLow : protocol::PacketTypeInfo::Ack, pipes::buffer_view{buffer, 2});
packet->enable_flag(PacketFlag::Unencrypted);
if(!low) packet->enable_flag(protocol::PacketFlag::NewProtocol);
this->connection->send_packet(packet);
#ifdef PKT_LOG_ACK
logTrace(this->getServerId(), "{}[Acknowledge][Server -> Client] Sending acknowledge for {}", CLIENT_STR_LOG_PREFIX, packetId);
#endif
}
void VoiceClient::tick(const std::chrono::system_clock::time_point &time) {
SpeakingClient::tick(time);
this->connection->tick();
{
ALARM_TIMER(A1, "VoiceClient::tick", milliseconds(3));
if(this->state == ConnectionState::CONNECTED) {
if(this->lastPingRequest > this->lastPingResponse) { //Client is behind :)
if(this->lastPingRequest - this->lastPingResponse > chrono::seconds(20)) {
debugMessage(this->getServerId(), "{} Got a ping timeout. (Last successful ping: {}ms ago. Last request {}ms. Last response {}ms). Trying to recover via command acknowledge.",
CLIENT_STR_LOG_PREFIX,
duration_cast<milliseconds>(this->lastPingRequest - this->lastPingResponse).count(),
duration_cast<milliseconds>(time - this->lastPingRequest).count(),
duration_cast<milliseconds>(time - this->lastPingResponse).count());
bool force;
this->request_connection_info(nullptr, force);
this->lastPingResponse = system_clock::now();
return;
}
}
if(time - this->lastPingRequest >= chrono::milliseconds(1000)) {
//TODO calculate the ping smooth
if(this->lastPingResponse < this->lastPingRequest){
if(time - this->lastPingRequest >= chrono::milliseconds(1500)) { //Max
this->sendPingRequest();
}
} else
this->sendPingRequest();
}
} else if(this->state == ConnectionState::INIT_LOW || this->state == ConnectionState::INIT_HIGH) {
//TODO!
if(this->state == ClientState::INITIALIZING) {
if(this->last_packet_handshake.time_since_epoch().count() != 0) {
if(time - this->last_packet_handshake > seconds(5)) {
debugMessage(this->getServerId(), "{} Got handshake timeout. {}. State: {} Time: {}", CLIENT_STR_LOG_PREFIX,
debugMessage(this->getServerId(), "{} Got initialize timeout. {}. Time: {}", CLIENT_STR_LOG_PREFIX,
this->getLoggingPeerIp() + ":" + to_string(this->getPeerPort()),
this->state == ConnectionState::INIT_HIGH ? "INIT_HIGH" : "INIT_LOW",
duration_cast<seconds>(time - this->last_packet_handshake).count()
);
this->close_connection(system_clock::now() + seconds(1));
@ -134,18 +91,18 @@ bool VoiceClient::disconnect(ts::ViewReasonId reason_id, const std::string &reas
* Its only for the clients own flavour and everything which the client receives after will be ignored :)
*/
ConnectionState old_state{};
ClientState old_state{};
{
std::lock_guard state_lock{this->state_lock};
if(this->state == ConnectionState::DISCONNECTING || this->state == ConnectionState::DISCONNECTED)
if(this->state == ClientState::DISCONNECTED)
return false; //Already disconnecting/disconnected
old_state = this->state;
this->state = ConnectionState::DISCONNECTING;
this->state = ClientState::DISCONNECTED;
}
if(old_state == ConnectionState::CONNECTED) {
/* Client has been successflly initialized; Send normal disconnect. */
if(old_state == ClientState::CONNECTED) {
/* Client has been successfully initialized; Send normal disconnect. */
Command cmd("notifyclientleftview");
cmd["reasonmsg"] = reason;
@ -194,120 +151,67 @@ bool VoiceClient::disconnect(ts::ViewReasonId reason_id, const std::string &reas
}
bool VoiceClient::close_connection(const system_clock::time_point &timeout) {
auto self_lock = dynamic_pointer_cast<VoiceClient>(_this.lock());
assert(self_lock); //Should never happen!
bool flush = timeout.time_since_epoch().count() > 0;
{
std::lock_guard state_lock{this->state_lock};
if(this->state == ConnectionState::DISCONNECTED) return false;
else if(this->state == ConnectionState::DISCONNECTING) {
/* here is nothing to pay attention for */
} else if(this->state == ConnectionState::DISCONNECTING_FLUSHING) {
if(!flush) {
this->state = ConnectionState::DISCONNECTED;
return true; /* the flush thread will execute the final disconnect */
} else {
//TODO: May update the flush timeout if its less then the other one?
return true;
}
}
this->state = flush ? ConnectionState::DISCONNECTING_FLUSHING : ConnectionState::DISCONNECTED;
this->state = ClientState::DISCONNECTED;
}
debugMessage(this->getServerId(), "{} Closing voice client connection. (Flush: {})", CLIENT_STR_LOG_PREFIX, flush);
//TODO: Move this out into a thread pool?
this->flushing_thread = std::make_shared<threads::Thread>(THREAD_SAVE_OPERATIONS | THREAD_EXECUTE_LATER, [this, self_lock, timeout, flush]{
{
/* Await that all commands have been processed. It does not make sense to unregister the client while command handling. */
std::lock_guard cmd_lock{this->command_lock};
}
if(flush) {
debugMessage(this->getServerId(), "{} Awaiting write prepare, write and acknowledge queue flushed", CLIENT_STR_LOG_PREFIX);
while(this->state == DISCONNECTING_FLUSHING) {
if(system_clock::now() > timeout){
auto write_queue_flushed = this->connection->wait_empty_write_and_prepare_queue(timeout);
auto acknowledge_received = connection->acknowledge_handler.awaiting_acknowledge() == 0;
if(write_queue_flushed && acknowledge_received)
break;
debugMessage(this->getServerId(), "{} Failed to flush pending messages. Acknowledges pending: {} Buffers pending: {}", CLIENT_STR_LOG_PREFIX, acknowledge_received, write_queue_flushed);
break;
}
if(!this->connection->wait_empty_write_and_prepare_queue(timeout))
continue;
if(connection->acknowledge_handler.awaiting_acknowledge() > 0) {
usleep(5000);
continue;
}
debugMessage(this->getServerId(), "{} Write and acknowledge queue are flushed", CLIENT_STR_LOG_PREFIX);
break;
}
}
if(this->state > DISCONNECTING) /* it could happen that the client "reconnects" while flushing this shit */
this->finalDisconnect();
});
flushing_thread->name("Flush thread VC").execute();
auto connection = std::exchange(this->connection_, nullptr);
if(connection) connection->close_connection(timeout);
return true;
}
void VoiceClient::finalDisconnect() {
void VoiceClient::finalize() {
/* there could not happen any IO while we're doing finalize()! */
auto ownLock = dynamic_pointer_cast<VoiceClient>(_this.lock());
assert(ownLock);
lock_guard disconnect_lock_final(this->finalDisconnectLock);
#if 0
if(this->final_disconnected) {
logError(this->getServerId(), "Tried to final disconnect {}/{} twice", this->getLoggingPeerIp() + ":" + to_string(this->getPeerPort()), this->getDisplayName());
return;
}
this->final_disconnected = true;
this->state = ConnectionState::DISCONNECTED;
#endif
this->state = ClientState::DISCONNECTED;
threads::MutexLock command_lock(this->command_lock); //We should not progress any commands while disconnecting
//Unload manager cache
this->processLeave();
{
if(this->flushing_thread) this->flushing_thread->detach(); //The thread itself should be already done or executing this method
this->flushing_thread.reset();
}
if(this->voice_server) this->voice_server->unregisterConnection(ownLock);
}
void VoiceClient::execute_handle_packet(const std::chrono::system_clock::time_point &time) {
this->connection->execute_handle_command_packets(time);
}
void VoiceClient::send_voice_packet(const pipes::buffer_view &voice_buffer, const SpeakingClient::VoicePacketFlags &flags) {
auto packet = make_shared<ServerPacket>(PacketTypeInfo::Voice, voice_buffer.length());
{
PacketFlag::PacketFlags packet_flags = PacketFlag::None;
packet_flags |= flags.encrypted ? 0 : PacketFlag::Unencrypted;
packet_flags |= flags.head ? PacketFlag::Compressed : 0;
packet_flags |= flags.fragmented ? PacketFlag::Fragmented : 0;
packet_flags |= flags.new_protocol ? PacketFlag::NewProtocol : 0;
packet_flags |= flags.encrypted ? 0U : PacketFlag::Unencrypted;
packet_flags |= flags.head ? PacketFlag::Compressed : 0U;
packet_flags |= flags.fragmented ? PacketFlag::Fragmented : 0U;
packet_flags |= flags.new_protocol ? PacketFlag::NewProtocol : 0U;
packet->set_flags(packet_flags);
}
memcpy(packet->data().data_ptr<void>(), voice_buffer.data_ptr<void>(), voice_buffer.length());
this->connection->send_packet(packet, false, false);
if(auto connection{this->connection_}; connection)
connection->send_packet(packet, false, false);
}
void VoiceClient::send_voice_whisper_packet(const pipes::buffer_view &voice_buffer, const SpeakingClient::VoicePacketFlags &flags) {
auto packet = make_shared<ServerPacket>(PacketTypeInfo::VoiceWhisper, voice_buffer.length());
{
PacketFlag::PacketFlags packet_flags = PacketFlag::None;
packet_flags |= flags.encrypted ? 0 : PacketFlag::Unencrypted;
packet_flags |= flags.head ? PacketFlag::Compressed : 0;
packet_flags |= flags.fragmented ? PacketFlag::Fragmented : 0;
packet_flags |= flags.new_protocol ? PacketFlag::NewProtocol : 0;
packet_flags |= flags.encrypted ? 0U : PacketFlag::Unencrypted;
packet_flags |= flags.head ? PacketFlag::Compressed : 0U;
packet_flags |= flags.fragmented ? PacketFlag::Fragmented : 0U;
packet_flags |= flags.new_protocol ? PacketFlag::NewProtocol : 0U;
packet->set_flags(packet_flags);
}
memcpy(packet->data().data_ptr<void>(), voice_buffer.data_ptr<void>(), voice_buffer.length());
this->connection->send_packet(packet, false, false);
if(auto connection{this->connection_}; connection)
connection->send_packet(packet, false, false);
}
std::chrono::milliseconds VoiceClient::calculatePing() {
auto connection = this->connection_;
return connection ? connection->ping_handler_.current_ping() : std::chrono::milliseconds{0};
}

View File

@ -7,7 +7,6 @@
#include <netinet/in.h>
#include <deque>
#include <cstdint>
#include <src/server/VoiceServer.h>
#include <EventLoop.h>
#include "../SpeakingClient.h"
#include "../ConnectedClient.h"
@ -42,12 +41,10 @@ namespace ts {
class VoiceClient : public SpeakingClient {
friend class VirtualServer;
friend class VoiceServer;
friend class ts::server::server::udp::POWHandler;
friend class ts::connection::VoiceClientConnection;
friend class ConnectedClient;
friend class io::IOServerHandler;
public:
VoiceClient(const std::shared_ptr<VoiceServer>& server,const sockaddr_storage*);
VoiceClient(const std::shared_ptr<VirtualServer>& server,const sockaddr_storage*);
~VoiceClient();
bool close_connection(const std::chrono::system_clock::time_point &timeout) override;
@ -60,43 +57,29 @@ namespace ts {
/* Note: Order is only guaranteed if progressDirectly is on! */
virtual void sendCommand0(const std::string_view& /* data */, bool low = false, bool progressDirectly = false, std::unique_ptr<threads::Future<bool>> listener = nullptr);
connection::VoiceClientConnection* getConnection(){ return connection; }
std::shared_ptr<VoiceServer> getVoiceServer(){ return voice_server; }
std::chrono::milliseconds calculatePing(){ return ping; }
/* the connection might be null! */
[[nodiscard]] inline auto connection() { return this->connection_; }
[[nodiscard]] std::chrono::milliseconds calculatePing();
private:
connection::VoiceClientConnection* connection;
std::shared_ptr<connection::VoiceClientConnection> connection_{nullptr};
protected:
std::shared_ptr<VoiceServer> voice_server;
void initialize(const std::shared_ptr<connection::VoiceClientConnection>& /* connection */);
void finalize();
void initialize();
virtual void tick(const std::chrono::system_clock::time_point &time) override;
void handlePacketCommand(const pipes::buffer_view&);
//Handshake helpers
public:
void send_voice_packet(const pipes::buffer_view &packet, const VoicePacketFlags &flags) override;
void send_voice_whisper_packet(const pipes::buffer_view &packet, const VoicePacketFlags &flags) override;
protected:
void handlePacketCommand(const pipes::buffer_view&);
virtual command_result handleCommand(Command &command) override;
//Ping/pong
uint16_t lastPingId = 0;
std::chrono::milliseconds ping = std::chrono::milliseconds(0);
std::chrono::system_clock::time_point lastPingResponse;
std::chrono::system_clock::time_point lastPingRequest;
std::chrono::system_clock::time_point last_packet_handshake;
private:
int socket = 0;
io::pktinfo_storage address_info;
void finalDisconnect();
bool final_disconnected = false;
//General TS3 manager commands
command_result handleCommandClientInitIv(Command&);
@ -104,9 +87,6 @@ namespace ts {
command_result handleCommandClientInit(Command&) override;
command_result handleCommandClientDisconnect(Command&);
//Locked by finalDisconnect, disconnect and close connection
std::shared_ptr<threads::Thread> flushing_thread;
struct {
bool client_init = false;
bool new_protocol = false;
@ -118,8 +98,11 @@ namespace ts {
std::shared_ptr<ecc_key> remote_key;
} crypto;
std::shared_ptr<event::ProxiedEventEntry<VoiceClient>> event_handle_packet;
void execute_handle_packet(const std::chrono::system_clock::time_point& /* scheduled */);
enum struct CryptoHandshakeState {
INITEV,
CLIENT_EK,
DONE
} crypto_handshake_state{CryptoHandshakeState::INITEV};
};
}
}

View File

@ -17,10 +17,10 @@ using namespace ts;
command_result VoiceClient::handleCommand(ts::Command &command) {
threads::MutexLock l2(this->command_lock);
if(this->state == ConnectionState::DISCONNECTED) return command_result{error::client_not_logged_in};
if(this->state == ClientState::DISCONNECTED) return command_result{error::client_not_logged_in};
if(!this->voice_server) return command_result{error::server_unbound};
if(this->state == ConnectionState::INIT_HIGH && this->handshake.state == HandshakeState::SUCCEEDED) {
if(this->state == ClientState::INITIALIZING && this->crypto_handshake_state == CryptoHandshakeState::DONE) {
if(command.command() == "clientinit")
return this->handleCommandClientInit(command);
} else if(command.command() == "clientdisconnect")

View File

@ -1,12 +1,13 @@
#include <misc/endianness.h>
#include <algorithm>
#include <log/LogUtils.h>
#include "../../server/VoiceServer.h"
#include <misc/memtracker.h>
#include <protocol/Packet.h>
#include <ThreadPool/Timer.h>
#include "./VoiceClientConnection.h"
#include "./VoiceClient.h"
#include "src/server/udp-server/UDPServer.h"
#include "src/InstanceHandler.h"
using namespace std;
using namespace std::chrono;
@ -15,7 +16,9 @@ using namespace ts::connection;
using namespace ts::protocol;
using namespace ts::server;
VoiceClientConnection::VoiceClientConnection(VoiceClient* client) :
VoiceClientConnection::VoiceClientConnection(server::server::udp::Server* server, const std::shared_ptr<server::VoiceClient>& client, int socket) :
socket{socket},
udp_server{server},
packet_encoder_{&this->crypt_handler, &this->compress_handler, &this->acknowledge_handler},
packet_decoder_{&this->crypt_handler} {
memtrack::allocated<VoiceClientConnection>(this);
@ -43,15 +46,26 @@ VoiceClientConnection::VoiceClientConnection(VoiceClient* client) :
this->ping_handler_.callback_send_ping = [](auto _this, auto& a1) {
reinterpret_cast<VoiceClientConnection*>(_this)->send_packet_ping(a1);
};
//TODO: The two other callbacks!
this->ping_handler_.callback_send_recovery_command = [](auto _this) {
reinterpret_cast<VoiceClientConnection*>(_this)->send_packet_ping_recovery();
};
this->ping_handler_.callback_time_outed = [](auto _this) {
reinterpret_cast<VoiceClientConnection*>(_this)->handle_ping_timeout();
};
this->server_id = client->getServerId();
this->client_handle = client;
this->client_handle_ = client;
this->crypt_handler.reset();
debugMessage(this->server_id, "Allocated new voice client connection at {}", (void*) this);
}
void VoiceClientConnection::initialize(const std::shared_ptr<VoiceClientConnection> &self) {
assert(&*self == this);
this->weak_this = self;
this->event_handle_packet = make_shared<event::ProxiedEventEntry<VoiceClientConnection>>(self, &VoiceClientConnection::execute_handle_command_packets);
}
VoiceClientConnection::~VoiceClientConnection() {
debugMessage(this->server_id, "Deleted voice client connection at {}", (void*) this);
@ -61,16 +75,25 @@ VoiceClientConnection::~VoiceClientConnection() {
this->write_queue.clear();
}
this->client_handle = nullptr;
this->client_handle_ = nullptr;
memtrack::freed<VoiceClientConnection>(this);
}
void VoiceClientConnection::register_client_for_write() {
std::shared_lock client_lock{this->client_mutex};
if(!this->client_handle) return;
void VoiceClientConnection::register_for_write() {
auto self = this->weak_this.lock();
assert(self);
this->udp_server->schedule_client_write(self);
}
if(this->client_handle->voice_server)
this->client_handle->voice_server->triggerWrite(dynamic_pointer_cast<VoiceClient>(this->client_handle->_this.lock()));
void VoiceClientConnection::register_for_command_handling() {
auto vmanager = serverInstance->getVoiceServerManager();
if(!vmanager)
return;
auto evloop = vmanager->get_executor_loop();
if(!evloop)
return;
evloop->schedule(this->event_handle_packet);
}
#ifdef CLIENT_LOG_PREFIX
@ -82,14 +105,7 @@ void VoiceClientConnection::register_client_for_write() {
void VoiceClientConnection::handle_incoming_datagram(const pipes::buffer_view& buffer) {
auto command_pending = this->packet_decoder_.decode_incoming_data(buffer);
if(command_pending) {
std::shared_lock clock{this->client_mutex};
if(!this->client_handle) return; //TODO: Warn etc?
auto voice_server = this->client_handle->voice_server;
if(voice_server)
voice_server->schedule_command_handling(this->client_handle);
}
if(command_pending) this->register_for_command_handling();
}
bool VoiceClientConnection::verify_encryption(const pipes::buffer_view &buffer) {
@ -98,20 +114,27 @@ bool VoiceClientConnection::verify_encryption(const pipes::buffer_view &buffer)
void VoiceClientConnection::handle_decoded_packet(const ts::protocol::ClientPacketParser &packet) {
auto packet_type = packet.type();
if(packet_type == PacketType::VOICE ) {
std::shared_lock clock{this->client_mutex};
if(!this->client_handle) return; //TODO: Warn etc?
if(packet_type == PacketType::VOICE) {
auto client = this->client_handle_;
if(!client) [[unlikely]] {
logWarning(this->server_id, "Received voice data for client, but we've no client associated with the connection.");
return;
}
this->client_handle->handlePacketVoice(packet.payload(), (packet.flags() & PacketFlag::Compressed) > 0, (packet.flags() & PacketFlag::Fragmented) > 0);
client->handlePacketVoice(packet.payload(), (packet.flags() & PacketFlag::Compressed) > 0, (packet.flags() & PacketFlag::Fragmented) > 0);
} else if(packet_type == PacketType::VOICE_WHISPER) {
std::shared_lock clock{this->client_mutex};
if(!this->client_handle) return; //TODO: Warn etc?
auto client = this->client_handle_;
if(!client) [[unlikely]] {
logWarning(this->server_id, "Received voice whisper data for client, but we've no client associated with the connection.");
return;
}
this->client_handle->handlePacketVoice(packet.payload(), (packet.flags() & PacketFlag::Compressed) > 0, (packet.flags() & PacketFlag::Fragmented) > 0);
client->handlePacketVoice(packet.payload(), (packet.flags() & PacketFlag::Compressed) > 0, (packet.flags() & PacketFlag::Fragmented) > 0);
} else if(packet_type == PacketType::ACK || packet_type == PacketType::ACK_LOW) {
string error{};
if(!this->acknowledge_handler.process_acknowledge(packet.type(), packet.payload(), error))
debugMessage(this->server_id, "{} Failed to handle acknowledge: {}", this->client_log_prefix(), error);
this->ping_handler_.received_command_acknowledged();
} else if(packet_type == PacketType::PING) {
/* just send a pong response */
char buffer[2];
@ -122,8 +145,7 @@ void VoiceClientConnection::handle_decoded_packet(const ts::protocol::ClientPack
} else if(packet_type == PacketType::PONG) {
if(packet.payload_length() < 2) return;
uint16_t ping_id = be2le16((char*) packet.payload().data_ptr());
//TODO: Ping handler handle ping
this->ping_handler_.received_pong(be2le16((char*) packet.payload().data_ptr()));
} else if(packet_type == PacketType::COMMAND || packet_type == PacketType::COMMAND_LOW) {
logCritical(this->server_id, "{} Received command packet within handle_decoded_packet callback.", this->client_log_prefix());
} else if(packet_type == PacketType::INIT1) {
@ -188,12 +210,22 @@ void VoiceClientConnection::send_packet_ping(uint16_t& ping_id) {
ping_id = packet->packetId();
}
void VoiceClientConnection::send_packet_ping_recovery() {
const char* command = "notifyserverpingrecovery";
auto packet = make_shared<protocol::ServerPacket>(protocol::PacketTypeInfo::Command,
pipes::buffer_view{(void*) command, strlen(command)}
);
this->send_packet(packet);
}
void VoiceClientConnection::execute_handle_command_packets(const std::chrono::system_clock::time_point& /* scheduled */) {
if((int) this->connection_state_ >= (int) ClientConnectionState::DISCONNECTING)
return;
std::shared_lock clock{this->client_handle};
if(!this->client_handle) return; //TODO: Warn etc?
auto client = this->client_handle_;
if(!client) [[unlikely]]
return;
std::lock_guard clock{client->command_lock};
using CommandReassembleResult = ts::server::server::udp::CommandReassembleResult;
@ -218,7 +250,7 @@ void VoiceClientConnection::execute_handle_command_packets(const std::chrono::sy
auto startTime = system_clock::now();
try {
this->client_handle->handlePacketCommand(payload);
client->handlePacketCommand(payload);
} catch (std::exception& ex) {
logCritical(this->server_id, "{} An exception has been thrown within command handling, which reached to root handler. This should not happen! (Message: {})", this->client_log_prefix(), ex.what());
}
@ -237,11 +269,8 @@ void VoiceClientConnection::execute_handle_command_packets(const std::chrono::sy
);
}
if(command_status == CommandReassembleResult::MORE_COMMANDS_PENDING) {
auto voice_server = this->client_handle->voice_server;
if(voice_server)
voice_server->schedule_command_handling(this->client_handle);
}
if(command_status == CommandReassembleResult::MORE_COMMANDS_PENDING)
this->register_for_command_handling();
}
@ -257,7 +286,7 @@ void VoiceClientConnection::send_packet(const shared_ptr<protocol::ServerPacket>
flags |= (unsigned) EncodeFlags::sync;
if(this->packet_encoder_.encode_packet(original_packet, (EncodeFlags) flags))
this->register_client_for_write();
this->register_for_write();
}
void VoiceClientConnection::handle_encode_error(const shared_ptr<protocol::ServerPacket> &packet,
@ -290,7 +319,7 @@ void VoiceClientConnection::handle_encoded_buffers(const std::vector<pipes::buff
std::lock_guard lock{this->write_queue_lock};
this->write_queue.insert(this->write_queue.begin(), buffers.begin(), buffers.end());
}
this->register_client_for_write();
this->register_for_write();
}
bool VoiceClientConnection::encode_packets() {
@ -321,41 +350,6 @@ WriteBufferStatus VoiceClientConnection::pop_write_buffer(pipes::buffer& target)
return size > 1 ? WriteBufferStatus::BUFFERS_LEFT : WriteBufferStatus::EMPTY;
}
bool VoiceClientConnection::wait_empty_write_and_prepare_queue(chrono::time_point<chrono::system_clock> until) {
while(true) {
#if 0
for(auto& queue : this->write_preprocess_queues) {
{
lock_guard lock{queue.queue_lock};
if(!queue.queue.empty())
goto _wait;
}
{
unique_lock lock{queue.work_lock, try_to_lock};
if(!lock.owns_lock())
goto _wait;
}
}
{
lock_guard buffer_lock{this->write_queue_lock};
if(!this->write_queue.empty())
goto _wait;
if(this->prepare_process_count != 0)
goto _wait;
}
#endif
break;
_wait:
if(until.time_since_epoch().count() != 0 && system_clock::now() > until)
return false;
threads::self::sleep_for(milliseconds(5));
}
return true;
}
void VoiceClientConnection::reset() {
this->packet_encoder_.reset();
this->packet_decoder_.reset();
@ -366,15 +360,28 @@ void VoiceClientConnection::reset() {
void VoiceClientConnection::force_insert_command(const pipes::buffer_view &buffer) {
this->packet_decoder_.force_insert_command(buffer);
this->register_for_command_handling();
}
std::shared_lock clock{this->client_handle};
if(!this->client_handle) return; //TODO: Warn etc?
void VoiceClientConnection::close_connection(const std::chrono::system_clock::time_point &timeout) {
//TODO!
if(timeout.time_since_epoch().count() > 0) {
auto voice_server = this->client_handle->voice_server;
if(voice_server)
voice_server->schedule_command_handling(this->client_handle);
} else {
this->connection_state_ = ClientConnectionState::DISCONNECTED;
/* Unregister connection from server */
}
}
void VoiceClientConnection::tick() {
//TODO: Tick ping handler
auto now = std::chrono::system_clock::now();
this->ping_handler_.tick(now);
if(this->connection_state_ == ClientConnectionState::DISCONNECTING) {
//TODO!
if(now > this->disconnect_timeout_) {
//TODO!
}
}
}

View File

@ -30,7 +30,9 @@
namespace ts {
namespace server {
class VoiceClient;
class VoiceServer;
namespace server::udp {
class Server;
}
}
namespace connection {
@ -44,7 +46,7 @@ namespace ts {
};
enum struct ClientConnectionState {
INITIALITZING, /* crypto setup */
INITIALIZING, /* crypto setup */
CONNECTED, /* basic connection has been established */
DISCONNECTING, /* connection is already disconnecting */
DISCONNECTED /* connection has been (maybe successfully) closed */
@ -54,12 +56,19 @@ namespace ts {
friend class server::VoiceServer;
friend class server::VoiceClient;
public:
explicit VoiceClientConnection(server::VoiceClient*);
explicit VoiceClientConnection(server::server::udp::Server*, const std::shared_ptr<server::VoiceClient>&, int /* socket */);
virtual ~VoiceClientConnection();
void initialize(const std::shared_ptr<VoiceClientConnection>& /* self */);
[[nodiscard]] inline CryptHandler* getCryptHandler(){ return &crypt_handler; }
//[[nodiscard]] inline server::VoiceClient* getClient(){ return client; }
[[nodiscard]] inline server::server::udp::PacketEncoder& packet_encoder() { return this->packet_encoder_; }
[[nodiscard]] inline server::server::udp::PacketDecoder& packet_decoder() { return this->packet_decoder_; }
[[nodiscard]] inline ClientConnectionState connection_state() const { return this->connection_state_; }
void send_packet(const std::shared_ptr<protocol::ServerPacket>& original_packet, bool copy = false, bool prepare_directly = false);
/*
@ -68,31 +77,38 @@ namespace ts {
*/
bool encode_packets();
[[nodiscard]] inline server::server::udp::PacketEncoder& packet_encoder() { return this->packet_encoder_; }
[[nodiscard]] inline server::server::udp::PacketDecoder& packet_decoder() { return this->packet_decoder_; }
/* return 2 => Nothing | 1 => More and buffer is set | 0 => Buffer is set, nothing more */
[[nodiscard]] WriteBufferStatus pop_write_buffer(pipes::buffer& /* buffer */);
bool wait_empty_write_and_prepare_queue(std::chrono::time_point<std::chrono::system_clock> until = std::chrono::time_point<std::chrono::system_clock>());
/* a flush timout less than now will cause the client to close the connection instantly */
void close_connection(const std::chrono::system_clock::time_point& /* flush timeout */);
void reset();
void tick();
void tick(); /* called via the UDP server tick */
void force_insert_command(const pipes::buffer_view& /* payload */);
void send_packet_acknowledge(uint16_t /* packet id */, bool /* is command low */);
void send_packet_ping(uint16_t& /* ping id */);
void send_packet_ping_recovery();
protected:
void handle_incoming_datagram(const pipes::buffer_view &buffer);
bool verify_encryption(const pipes::buffer_view& /* full packet */);
void register_client_for_write();
void register_for_write();
void register_for_command_handling();
private:
VirtualServerId server_id{0};
std::shared_mutex client_mutex{};
server::VoiceClient* client_handle{nullptr};
std::weak_ptr<VoiceClientConnection> weak_this{};
ClientConnectionState connection_state_{ClientConnectionState::INITIALITZING};
server::server::udp::Server* udp_server;
int socket{0};
io::pktinfo_storage address_info{};
VirtualServerId server_id{0};
/* may change at any given time. */
std::shared_ptr<server::VoiceClient> client_handle_{nullptr};
ClientConnectionState connection_state_{ClientConnectionState::INITIALIZING};
std::chrono::system_clock::time_point disconnect_timeout_{};
CryptHandler crypt_handler{};
CompressionHandler compress_handler{};
@ -105,6 +121,10 @@ namespace ts {
server::server::udp::PacketEncoder packet_encoder_;
server::server::udp::PacketDecoder packet_decoder_;
server::server::udp::PingHandler ping_handler_{};
std::shared_ptr<event::ProxiedEventEntry<VoiceClientConnection>> event_handle_packet;
//Handle stuff
[[nodiscard]] std::string client_log_prefix();
void execute_handle_command_packets(const std::chrono::system_clock::time_point& /* scheduled */);
@ -116,6 +136,8 @@ namespace ts {
/* will be called on the IO thread */
void handle_decoded_packet(const protocol::ClientPacketParser&);
void handle_decode_error(ts::server::server::udp::PacketDecodeResult /* error */, const std::string& /* custom message */);
void handle_ping_timeout();
};
}
}

View File

@ -23,10 +23,13 @@ inline void generate_random(uint8_t *destination, size_t length) {
}
ts::command_result VoiceClient::handleCommandClientInitIv(Command& command) {
auto connection = this->connection_;
if(!connection) return ts::command_result{error::vs_critical};
this->last_packet_handshake = system_clock::now();
std::unique_lock state_lock{this->state_lock};
if(this->state == ConnectionState::CONNECTED) { /* we've a reconnect */
if(this->state == ClientState::CONNECTED) { /* we've a reconnect */
if(system_clock::now() - this->lastPingResponse < seconds(5)) {
logMessage(this->getServerId(), "{} Client initialized session reconnect, but last ping response is not older then 5 seconds ({}). Ignoring attempt",
CLIENT_STR_LOG_PREFIX,
@ -53,24 +56,24 @@ ts::command_result VoiceClient::handleCommandClientInitIv(Command& command) {
this->server->client_move(this->ref(), nullptr, nullptr, config::messages::timeout::connection_reinitialized, ViewReasonId::VREASON_TIMEOUT, false, server_channel_lock);
}
this->finalDisconnect();
this->finalize();
state_lock.lock();
} else if(this->state >= ConnectionState::DISCONNECTING) {
state_lock.unlock();
std::shared_lock disconnect_finish{this->finalDisconnectLock}; /* await until the last disconnect has been processed */
state_lock.lock();
this->state = ConnectionState::INIT_HIGH;
} else if(this->state == ConnectionState::INIT_HIGH) {
logTrace(this->getServerId(), "{} Received a duplicated initiv. It seems like our initivexpand2 hasn't yet reached the client. The acknowledge handler should handle this issue for us.", CLIENT_STR_LOG_PREFIX);
return command_result{error::ok};
} else if(this->state == ClientState::DISCONNECTED) {
this->state = ClientState::INITIALIZING;
this->crypto_handshake_state = CryptoHandshakeState::INITEV;
connection->reset();
} else {
this->state = ConnectionState::INIT_HIGH;
assert(this->state == ClientState::INITIALIZING);
if(this->crypto_handshake_state != CryptoHandshakeState::INITEV) {
logTrace(this->getServerId(), "{} Received a duplicated initiv. It seems like our initivexpand2 hasn't yet reached the client. The acknowledge handler should handle this issue for us.", CLIENT_STR_LOG_PREFIX);
return command_result{error::ok};
}
}
state_lock.unlock();
this->connection->reset();
this->connection->packet_decoder().register_initiv_packet();
this->connection->packet_decoder().set_protocol_encrypted(false);
connection->reset();
connection->packet_decoder().register_initiv_packet();
connection->packet_decoder().set_protocol_encrypted(false);
bool use_teaspeak = command.hasParm("teaspeak");
if(use_teaspeak ? !config::server::clients::teaspeak : !config::server::clients::teamspeak)
@ -145,6 +148,7 @@ ts::command_result VoiceClient::handleCommandClientInitIv(Command& command) {
initivexpand2["ot"] = 1;
this->sendCommand(initivexpand2);
this->crypto_handshake_state = CryptoHandshakeState::CLIENT_EK;
this->handshake.state = HandshakeState::SUCCEEDED; /* we're doing the verify via TeamSpeak */
} else {
debugMessage(this->getServerId(), "{} Got non client 3.1 protocol with build timestamp {}", CLIENT_STR_LOG_PREFIX, this->crypto.client_time);
@ -167,34 +171,41 @@ ts::command_result VoiceClient::handleCommandClientInitIv(Command& command) {
this->handshake.state = HandshakeState::SUCCEEDED; /* we're doing the verify via TeamSpeak */
}
this->sendCommand0(initivexpand.build(), false, true); //If we setup the encryption now
this->crypto_handshake_state = CryptoHandshakeState::DONE;
}
{
string error;
if(!this->connection->getCryptHandler()->setupSharedSecret(this->crypto.alpha, this->crypto.beta, this->crypto.remote_key.get(), this->server->serverKey(), error)){
if(!connection->getCryptHandler()->setupSharedSecret(this->crypto.alpha, this->crypto.beta, this->crypto.remote_key.get(), this->server->serverKey(), error)){
logError(this->server->getServerId(), "Could not setup shared secret! (" + error + ")");
return ts::command_result{error::vs_critical};
}
auto& decoder = this->connection->packet_decoder();
auto& decoder = connection->packet_decoder();
decoder.set_protocol_encrypted(true);
}
}
return ts::command_result{error::ok};
}
ts::command_result VoiceClient::handleCommandClientEk(Command& cmd) {
auto connection = this->connection_;
if(!connection) return ts::command_result{error::vs_critical};
if(this->crypto_handshake_state != CryptoHandshakeState::CLIENT_EK) return ts::command_result{error::client_hacked};
this->last_packet_handshake = system_clock::now();
debugMessage(this->getServerId(), "{} Got client ek!", CLIENT_STR_LOG_PREFIX);
auto client_key = base64::decode(cmd["ek"]);
auto private_key = this->crypto.chain_data->chain->generatePrivateKey(this->crypto.chain_data->root_key, this->crypto.chain_data->root_index);
this->connection->getCryptHandler()->setupSharedSecretNew(this->crypto.alpha, this->crypto.beta, (char*) private_key.data(), client_key.data());
this->connection->acknowledge_handler.reset();
connection->getCryptHandler()->setupSharedSecretNew(this->crypto.alpha, this->crypto.beta, (char*) private_key.data(), client_key.data());
connection->acknowledge_handler.reset();
this->crypto_handshake_state = CryptoHandshakeState::DONE;
auto& decoder = this->connection->packet_decoder();
auto& decoder = connection->packet_decoder();
decoder.set_protocol_encrypted(true);
this->connection->send_packet_acknowledge(1, false); //Send the encrypted acknowledge (most the times the second packet; If not we're going into the resend loop)
connection->send_packet_acknowledge(1, false); //Send the encrypted acknowledge (most the times the second packet; If not we're going into the resend loop)
return ts::command_result{error::ok};
}

View File

@ -1,7 +1,6 @@
#include <misc/endianness.h>
#include <log/LogUtils.h>
#include "../web/WebClient.h"
#include "VoiceClient.h"
#include "./VoiceClient.h"
using namespace std;
using namespace std::chrono;
@ -10,6 +9,11 @@ using namespace ts::protocol;
//#define PKT_LOG_PING
void VoiceClient::handlePacketCommand(const pipes::buffer_view& command_string) {
{
std::lock_guard slock{this->state_lock};
if(this->state == ClientState::DISCONNECTED) return;
}
std::unique_ptr<Command> command;
command_result result{};
try {

View File

@ -125,7 +125,7 @@ inline bool is_ssl_handshake_header(const pipes::buffer_view& buffer) {
void WebClient::processNextMessage(const std::chrono::system_clock::time_point& /* scheduled */) {
lock_guard execute_lock(this->execute_lock);
if(this->state != ConnectionState::INIT_HIGH && this->state != ConnectionState::INIT_LOW && this->state != ConnectionState::CONNECTED)
if(this->state != ClientState::INIT_HIGH && this->state != ClientState::INITIALIZING && this->state != ClientState::CONNECTED)
return;
unique_lock buffer_lock(this->queue_lock);

View File

@ -26,7 +26,7 @@ WebClient::WebClient(WebControlServer* server, int fd) : SpeakingClient(server->
memtrack::allocated<WebClient>(this);
assert(server->getTS());
this->state = ConnectionState::INIT_LOW;
this->state = ClientState::INITIALIZING;
this->file_descriptor = fd;
}
@ -178,9 +178,9 @@ bool WebClient::close_connection(const std::chrono::system_clock::time_point& ti
assert(self_lock);
unique_lock state_lock(this->state_lock);
if(this->state == ConnectionState::DISCONNECTED) return false;
if(this->state == ConnectionState::DISCONNECTING && flushing) return true;
this->state = flushing ? ConnectionState::DISCONNECTING : ConnectionState::DISCONNECTED;
if(this->state == ClientState::DISCONNECTED) return false;
if(this->state == ClientState::DISCONNECTING && flushing) return true;
this->state = flushing ? ClientState::DISCONNECTING : ClientState::DISCONNECTED;
unique_lock close_lock(this->close_lock);
state_lock.unlock();
@ -203,7 +203,7 @@ bool WebClient::close_connection(const std::chrono::system_clock::time_point& ti
while(true) {
{
lock_guard lock(self_lock->state_lock);
if(self_lock->state != ConnectionState::DISCONNECTING) return; /* somebody else had this problem now */
if(self_lock->state != ClientState::DISCONNECTING) return; /* somebody else had this problem now */
}
flag_flushed = true;
@ -226,8 +226,8 @@ bool WebClient::close_connection(const std::chrono::system_clock::time_point& ti
{
lock_guard lock(self_lock->state_lock);
if(self_lock->state != ConnectionState::DISCONNECTING) return; /* somebody else had this problem now */
self_lock->state = ConnectionState::DISCONNECTED;
if(self_lock->state != ClientState::DISCONNECTING) return; /* somebody else had this problem now */
self_lock->state = ClientState::DISCONNECTED;
}
/* we can lock here again because we've already ensured that we're still disconnecting and updated the status to disconnected.
* So no thread will wait for this thread while close_lock had been locked
@ -246,7 +246,7 @@ bool WebClient::close_connection(const std::chrono::system_clock::time_point& ti
}
command_result WebClient::handleCommand(Command &command) {
if(this->connectionState() == ConnectionState::INIT_HIGH && this->handshake.state == HandshakeState::SUCCEEDED){
if(this->connectionState() == ClientState::INIT_HIGH && this->handshake.state == HandshakeState::SUCCEEDED){
if(command.command() == "clientinit") {
auto result = this->handleCommandClientInit(command);
if(result.error_code())
@ -289,7 +289,7 @@ void WebClient::tick(const std::chrono::system_clock::time_point& point) {
}
void WebClient::onWSConnected() {
this->state = ConnectionState::INIT_HIGH;
this->state = ClientState::INIT_HIGH;
this->handshake.state = HandshakeState::BEGIN;
debugMessage(this->getServerId(), "{} WebSocket handshake completed!", CLIENT_STR_LOG_PREFIX);
//TODO here!

View File

@ -4,6 +4,7 @@
#include <netdb.h>
#include <cassert>
#include <misc/strobf.h>
#include <misc/net.h>
#include <log/LogUtils.h>
#include <src/Configuration.h>
#include <src/ShutdownHelper.h>

View File

@ -301,7 +301,7 @@ shared_ptr<ts::server::VoiceClient> POWHandler::register_verified_client(const s
voice_client->initialize();
voice_client->socket = client->socket;
voice_client->state = ConnectionState::INIT_LOW;
voice_client->state = ClientState::INITIALIZING;
memcpy(&voice_client->address_info, &client->address_info, sizeof(client->address_info));
{

View File

@ -4,7 +4,6 @@
#include <netinet/in.h>
#include <pipes/buffer.h>
#include <Definitions.h>
#include "VoiceServer.h"
#include "src/VirtualServer.h"
#include "./udp-server/PrecomputedPuzzles.h"

View File

@ -13,6 +13,7 @@
#include <src/InstanceHandler.h>
#include <log/LogUtils.h>
#include <src/client/ConnectedClient.h>
#include "src/client/query/QueryClientConnection.h"
using namespace std;
using namespace std::chrono;
@ -23,6 +24,10 @@ using namespace ts::server;
#define TCP_NOPUSH TCP_CORK
#endif
namespace ts::server::server::query {
extern thread_local bool thread_is_event_loop;
}
QueryServer::QueryServer(sql::SqlManager* db) : sql(db) {
this->_executePool = new threads::ThreadPool(4, "EXEC Query");
}
@ -79,6 +84,7 @@ bool QueryServer::start(const deque<shared_ptr<QueryServer::Binding>> &bindings,
{
this->eventLoop = event_base_new();
this->ioThread = new threads::Thread(THREAD_SAVE_OPERATIONS | THREAD_EXECUTE_LATER, [&]{
ts::server::server::query::thread_is_event_loop = true;
while(this->active) {
debugMessage(LOG_QUERY, "Entering event loop ({})", (void*) this->eventLoop);
event_base_loop(this->eventLoop, EVLOOP_NO_EXIT_ON_EMPTY);
@ -386,18 +392,19 @@ void QueryServer::on_client_receive(int _server_file_descriptor, short ev, void
}
}
shared_ptr<QueryClient> client = std::make_shared<QueryClient>(this, file_descriptor);
client->applySelfLock(client);
std::string error{};
auto client = std::make_shared<QueryClient>(this, file_descriptor);
if(!client->initialize(error, client)) {
logError(LOG_QUERY, "Failed to initialize newly accepted query client: {}", error);
return;
}
memcpy(&client->remote_address, &remote_address, sizeof(remote_address));
{
lock_guard lock(this->connected_clients_lock);
this->connectedClients.push_back(client);
}
client->preInitialize();
if(client->readEvent) {
event_add(client->readEvent, nullptr);
}
client->connection->add_read_event();
logMessage(LOG_QUERY, "Got new client from {}", client->getLoggingPeerIp() + ":" + to_string(client->getPeerPort()));
}

View File

@ -80,6 +80,8 @@ namespace ts {
bool change_query_password(const std::shared_ptr<QueryAccount>& /* account */, const std::string& /* new password */);
threads::ThreadPool* executePool() { return this->_executePool; }
[[nodiscard]] inline event_base* io_event_loop() { return this->eventLoop; }
[[nodiscard]] inline std::thread::id io_event_loop_id() { return {}; }
private:
sql::SqlManager* sql;

View File

@ -12,8 +12,11 @@ namespace ts {
namespace server {
class VirtualServer;
class VoiceServer;
class VoiceClient;
}
namespace connection {
class VoiceClientConnection;
}
namespace io {
union pktinfo_storage {
in_pktinfo v4;
@ -84,7 +87,7 @@ namespace ts {
datagram_packet_t dg_write_queue_head = nullptr;
datagram_packet_t dg_write_queue_tail = nullptr;
std::deque<std::weak_ptr<server::VoiceClient>> voice_write_queue;
std::deque<std::weak_ptr<connection::VoiceClientConnection>> voice_write_queue;
inline datagram_packet_t pop_dg_write_queue() {
std::lock_guard lock(this->write_queue_lock);
@ -112,13 +115,13 @@ namespace ts {
this->dg_write_queue_tail = packet;
}
inline void push_voice_write_queue(const std::shared_ptr<server::VoiceClient>& client) {
inline void push_voice_write_queue(const std::shared_ptr<connection::VoiceClientConnection>& client) {
std::lock_guard lock(this->write_queue_lock);
this->voice_write_queue.push_back(client);
}
/* return 0 on success | 1 on there is more, but success | 2 on empty */
inline int pop_voice_write_queue(std::shared_ptr<server::VoiceClient>& result) {
inline int pop_voice_write_queue(std::shared_ptr<connection::VoiceClientConnection>& result) {
std::lock_guard lock(this->write_queue_lock);
auto it_begin = this->voice_write_queue.begin();

View File

@ -5,6 +5,7 @@
#include <unistd.h>
#include <fcntl.h>
#include "VoiceServer.h"
#include "../client/voice/VoiceClientConnection.h"
#include "../client/voice/VoiceClient.h"
#include "../Configuration.h"
#include <log/LogUtils.h>
@ -109,7 +110,7 @@ bool VoiceServer::start(const std::deque<std::shared_ptr<VoiceServerBinding>>& b
return true;
}
void VoiceServer::triggerWrite(const std::shared_ptr<VoiceClient>& client) {
void VoiceServer::triggerWrite(const std::shared_ptr<connection::VoiceClientConnection>& client) {
if(!client) {
logError(this->server->getServerId(), "Invalid client for triggerWrite()");
return;
@ -118,7 +119,7 @@ void VoiceServer::triggerWrite(const std::shared_ptr<VoiceClient>& client) {
this->io->invoke_write(client);
}
void VoiceServer::schedule_command_handling(const ts::server::VoiceClient *client) {
void VoiceServer::schedule_command_handling(const connection::VoiceClientConnection *client) {
auto vmanager = serverInstance->getVoiceServerManager();
if(!vmanager)
return;
@ -129,7 +130,7 @@ void VoiceServer::schedule_command_handling(const ts::server::VoiceClient *clien
evloop->schedule(client->event_handle_packet);
}
void VoiceServer::tickHandshakingClients() {
void VoiceServer::tickClients() {
this->pow_handler->execute_tick();
decltype(this->activeConnections) connections;
@ -138,8 +139,7 @@ void VoiceServer::tickHandshakingClients() {
connections = this->activeConnections;
}
for(const auto& client : connections)
if(client->state == ConnectionState::INIT_HIGH || client->state == ConnectionState::INIT_LOW)
client->tick(system_clock::now());
client->tick();
}
void VoiceServer::execute_resend(const std::chrono::system_clock::time_point &now, std::chrono::system_clock::time_point &next) {
@ -157,7 +157,7 @@ void VoiceServer::execute_resend(const std::chrono::system_clock::time_point &no
if (connection->acknowledge_handler.execute_resend(now, next, buffers, error) < 0) {
debugMessage(client->getServerId(), "{} Failed to execute packet resend: {}", CLIENT_STR_LOG_PREFIX_(client), error);
if(client->state == ConnectionState::CONNECTED) {
if(client->state == ClientState::CONNECTED) {
client->disconnect(ViewReasonId::VREASON_TIMEOUT, config::messages::timeout::packet_resend_failed, nullptr, true);
} else {
client->close_connection(system_clock::now() + seconds(1));
@ -342,7 +342,7 @@ void VoiceServer::handleMessageRead(int fd, short events, void *_event_handle) {
continue;
if(memcmp(&client->remote_address, &remote_address, sizeof(sockaddr_storage)) != 0) { /* verify the remote address */
if((read_buffer[12] & 0x80) == 0 && client->state == ConnectionState::CONNECTED) { /* only encrypted packets are allowed */
if((read_buffer[12] & 0x80) == 0 && client->state == ClientState::CONNECTED) { /* only encrypted packets are allowed */
if(client->connection->verify_encryption(read_buffer.view(0, bytes_read))) { /* the ip had changed */
auto old_address = net::to_string(client->remote_address);
auto new_address = net::to_string(remote_address);
@ -357,7 +357,7 @@ void VoiceServer::handleMessageRead(int fd, short events, void *_event_handle) {
}
}
if(client->state != ConnectionState::DISCONNECTED){
if(client->state != ClientState::DISCONNECTED){
client->connection->handle_incoming_datagram(read_buffer.view(0, bytes_read));
client = nullptr;
}

View File

@ -10,84 +10,78 @@
#include <protocol/ringbuffer.h>
#include "VoiceIOManager.h"
namespace ts {
namespace server {
namespace server::udp {
class POWHandler;
}
class VirtualServer;
class ConnectedClient;
class VoiceClient;
namespace ts::connection {
class VoiceClientConnection;
}
struct VoiceServerBinding {
sockaddr_storage address{};
int file_descriptor = 0;
inline std::string address_string() { return net::to_string(address); }
inline uint16_t address_port() { return net::port(address); }
};
class VoiceServer {
friend class VoiceClient;
friend class io::VoiceIOManager;
friend struct io::IOEventLoopEvents;
friend class server::udp::POWHandler;
public:
explicit VoiceServer(const std::shared_ptr<VirtualServer>& server);
~VoiceServer();
bool start(const std::deque<std::shared_ptr<VoiceServerBinding>>&, std::string&);
bool stop(const std::chrono::milliseconds& flushTimeout = std::chrono::milliseconds(1000));
std::shared_ptr<VoiceClient> findClient(ClientId);
std::shared_ptr<VoiceClient> findClient(sockaddr_in* addr, bool lock = true);
std::shared_ptr<VoiceClient> findClient(sockaddr_in6* addr, bool lock = true);
inline std::shared_ptr<VoiceClient> findClient(sockaddr_storage* address, bool lock = true) {
return address->ss_family == AF_INET ?
this->findClient((sockaddr_in*) address, lock) :
address->ss_family == AF_INET6 ?
this->findClient((sockaddr_in6*) address, lock) :
nullptr;
}
bool unregisterConnection(std::shared_ptr<VoiceClient>);
inline std::deque<std::shared_ptr<VoiceServerBinding>> activeBindings() {
std::deque<std::shared_ptr<VoiceServerBinding>> result;
for(const auto& entry : this->bindings)
if(entry->file_descriptor > 0) result.push_back(entry);
return result;
}
inline std::shared_ptr<VirtualServer> get_server() { return this->server; }
private:
std::unique_ptr<server::udp::POWHandler> pow_handler;
std::shared_ptr<VirtualServer> server = nullptr;
bool running = false;
std::deque<std::shared_ptr<VoiceServerBinding>> bindings;
std::recursive_mutex connectionLock;
std::deque<std::shared_ptr<VoiceClient>> activeConnections;
public: //lib event
void triggerWrite(const std::shared_ptr<VoiceClient> &);
void schedule_command_handling(VoiceClient const *client);
void tickHandshakingClients();
void execute_resend(const std::chrono::system_clock::time_point& /* now */, std::chrono::system_clock::time_point& /* next resend */);
void send_datagram(int /* socket */, io::datagram_packet_t /* packet */);
std::shared_ptr<io::IOServerHandler> io;
private:
static void handleMessageRead(int, short, void *);
static void handleMessageWrite(int, short, void *);
/* execute loop */
/* TODO
std::mutex execute_list_lock;
protocol::RingBuffer<ClientId, 128, uint8_t> execute_list;
void run_execute_clients();
*/
};
namespace ts::server {
namespace server::udp {
class POWHandler;
}
class VirtualServer;
class ConnectedClient;
struct VoiceServerBinding {
sockaddr_storage address{};
int file_descriptor = 0;
inline std::string address_string() { return net::to_string(address); }
inline uint16_t address_port() { return net::port(address); }
};
class VoiceServer {
friend class VoiceClient;
friend class io::VoiceIOManager;
friend struct io::IOEventLoopEvents;
friend class server::udp::POWHandler;
public:
explicit VoiceServer(const std::shared_ptr<VirtualServer>& server);
~VoiceServer();
bool start(const std::deque<std::shared_ptr<VoiceServerBinding>>&, std::string&);
bool stop(const std::chrono::milliseconds& flushTimeout = std::chrono::milliseconds(1000));
std::shared_ptr<connection::VoiceClientConnection> findClient(sockaddr_in* addr, bool lock = true);
std::shared_ptr<connection::VoiceClientConnection> findClient(sockaddr_in6* addr, bool lock = true);
inline std::shared_ptr<connection::VoiceClientConnection> findClient(sockaddr_storage* address, bool lock = true) {
return address->ss_family == AF_INET ?
this->findClient((sockaddr_in*) address, lock) :
address->ss_family == AF_INET6 ?
this->findClient((sockaddr_in6*) address, lock) :
nullptr;
}
bool unregisterConnection(std::shared_ptr<connection::VoiceClientConnection>);
inline std::deque<std::shared_ptr<VoiceServerBinding>> activeBindings() {
std::deque<std::shared_ptr<VoiceServerBinding>> result;
for(const auto& entry : this->bindings)
if(entry->file_descriptor > 0) result.push_back(entry);
return result;
}
inline std::shared_ptr<VirtualServer> get_server() { return this->server; }
private:
std::unique_ptr<server::udp::POWHandler> pow_handler;
std::shared_ptr<VirtualServer> server = nullptr;
bool running = false;
std::deque<std::shared_ptr<VoiceServerBinding>> bindings;
std::recursive_mutex connectionLock;
std::deque<std::shared_ptr<connection::VoiceClientConnection>> activeConnections;
public: //lib event
void triggerWrite(const std::shared_ptr<connection::VoiceClientConnection> &);
void schedule_command_handling(connection::VoiceClientConnection const *client);
void tickClients();
void execute_resend(const std::chrono::system_clock::time_point& /* now */, std::chrono::system_clock::time_point& /* next resend */);
void send_datagram(int /* socket */, io::datagram_packet_t /* packet */);
std::shared_ptr<io::IOServerHandler> io;
private:
static void handleMessageRead(int, short, void *);
static void handleMessageWrite(int, short, void *);
};
}

View File

@ -9,11 +9,15 @@
#include <deque>
namespace ts::server {
class VoiceClient;
class VirtualServer;
}
namespace vserver {
class VirtualServerBase;
}
namespace ts::connection {
class VoiceClientConnection;
}
namespace ts::server::vserver {
class VirtualServerBase;
}
namespace ts::server::server::udp {
@ -61,14 +65,14 @@ namespace ts::server::server::udp {
int file_descriptor{0};
event* event_read{};
event* event_write{};
struct event* event_read{};
struct event* event_write{};
spin_lock write_queue_lock{};
datagram_packet* dg_write_queue_head{nullptr};
datagram_packet* dg_write_queue_tail{nullptr};
write_ring_queue<std::weak_ptr<VoiceClient>, 1024 * 8> voice_write_queue{};
write_ring_queue<std::weak_ptr<connection::VoiceClientConnection>, 1024 * 8> voice_write_queue{};
};
struct io_loop {
@ -80,13 +84,18 @@ namespace ts::server::server::udp {
};
struct io_binding {
vserver::VirtualServerBase* virtual_server{nullptr};
VirtualServer* virtual_server{nullptr};
sockaddr_storage address{};
size_t loop_entry_index{0};
std::vector<io_loop_entry*> loop_entries{};
};
enum struct ServerRegisterResult {
SUCCESS,
FAILED_TO_BIND
};
class Server {
public:
Server();
@ -95,19 +104,22 @@ namespace ts::server::server::udp {
bool initialize(std::string& /* error */);
void finalize();
void register_virtual_server(vserver::VirtualServerBase* /* server */);
ServerRegisterResult register_virtual_server(VirtualServer* /* server */);
/* this will block until all executions have been finished */
void unregister_virtual_server(vserver::VirtualServerBase* /* server */);
void unregister_virtual_server(VirtualServer* /* server */);
void schedule_client_write(const std::shared_ptr<VoiceClient>& /* client */);
void unregister_client(const std::shared_ptr<VoiceClient>& /* client */);
void schedule_client_write(const std::shared_ptr<connection::VoiceClientConnection>& /* client */);
void unregister_client(const std::shared_ptr<connection::VoiceClientConnection>& /* client */);
private:
std::mutex io_lock{};
std::vector<io_loop*> io_loops{};
std::mutex bindings_lock{};
std::vector<io_binding*> io_bindings{}; /* may contains nullptr! */
std::thread client_tick_thread{};
void execute_client_ticking();
};
}

2
shared

@ -1 +1 @@
Subproject commit 62292af022798db2aba9ae5aa69aebbb849fb75a
Subproject commit eb503d43156fbcb091fee9f9a33110b1be99c2ed