Some more updates

This commit is contained in:
WolverinDEV 2020-03-10 17:03:38 +01:00
parent 799df15ace
commit 60a1c34dc9
26 changed files with 603 additions and 251 deletions

View File

@ -161,7 +161,7 @@ if (COMPILE_WEB_CLIENT)
src/client/web/WSWebClient.cpp src/client/web/WSWebClient.cpp
src/client/web/SampleHandler.cpp src/client/web/SampleHandler.cpp
src/client/web/VoiceBridge.cpp src/client/web/VoiceBridge.cpp
src/client/command_handler/helpers.h src/music/PlaylistPermissions.cpp src/music/PlaylistPermissions.h src/lincense/LicenseService.cpp src/lincense/LicenseService.h src/groups/GroupManager.cpp src/groups/GroupManager.h src/groups/GroupAssignmentManager.cpp src/groups/GroupAssignmentManager.h src/groups/Group.cpp src/groups/Group.h src/services/VirtualServerInformation.cpp src/services/VirtualServerInformation.h src/vserver/VirtualServerManager.cpp src/vserver/VirtualServerManager.h src/services/VirtualServerBroadcastService.cpp src/services/VirtualServerBroadcastService.h src/server/udp-server/UDPServer.cpp src/server/udp-server/UDPServer.h) src/client/command_handler/helpers.h src/music/PlaylistPermissions.cpp src/music/PlaylistPermissions.h src/lincense/LicenseService.cpp src/lincense/LicenseService.h src/groups/GroupManager.cpp src/groups/GroupManager.h src/groups/GroupAssignmentManager.cpp src/groups/GroupAssignmentManager.h src/groups/Group.cpp src/groups/Group.h src/services/VirtualServerInformation.cpp src/services/VirtualServerInformation.h src/vserver/VirtualServerManager.cpp src/vserver/VirtualServerManager.h src/services/VirtualServerBroadcastService.cpp src/services/VirtualServerBroadcastService.h src/server/udp-server/UDPServer.cpp src/server/udp-server/UDPServer.h src/client/voice/PacketEncoder.cpp src/client/voice/PacketEncoder.h src/client/voice/PacketDecoder.cpp src/client/voice/PacketDecoder.h)
endif () endif ()
add_executable(PermHelper helpers/permgen.cpp) add_executable(PermHelper helpers/permgen.cpp)

View File

@ -42,8 +42,8 @@ void ts::server::shutdownInstance(const std::string& message) {
mainThreadActive = false; mainThreadActive = false;
} }
std::shared_ptr<server::ShutdownData> currentShutdown = nullptr; std::shared_ptr<ts::server::ShutdownData> currentShutdown = nullptr;
std::shared_ptr<server::ShutdownData> server::scheduledShutdown() { return currentShutdown; } std::shared_ptr<ts::server::ShutdownData> ts::server::scheduledShutdown() { return currentShutdown; }
inline void broadcastMessage(const std::string& message) { inline void broadcastMessage(const std::string& message) {
if(!serverInstance || !serverInstance->getVoiceServerManager()) if(!serverInstance || !serverInstance->getVoiceServerManager())
@ -57,8 +57,8 @@ inline void broadcastMessage(const std::string& message) {
} }
void executeScheduledShutdown(const std::shared_ptr<ShutdownData>& data); void executeScheduledShutdown(const std::shared_ptr<ShutdownData>& data);
bool server::scheduleShutdown(const std::chrono::system_clock::time_point& time, const std::string& reason) { bool ts::server::scheduleShutdown(const std::chrono::system_clock::time_point& time, const std::string& reason) {
server::cancelShutdown(false); //Cancel old shutdown ts::server::cancelShutdown(false); //Cancel old shutdown
auto data = std::make_shared<ShutdownData>(); auto data = std::make_shared<ShutdownData>();
data->active = true; data->active = true;
@ -73,13 +73,13 @@ bool server::scheduleShutdown(const std::chrono::system_clock::time_point& time,
return true; return true;
} }
void server::cancelShutdown(bool notify) { void ts::server::cancelShutdown(bool notify) {
if(!currentShutdown) return; if(!currentShutdown) return;
if(notify && !config::messages::shutdown::canceled.empty()) { if(notify && !config::messages::shutdown::canceled.empty()) {
broadcastMessage(config::messages::shutdown::canceled); broadcastMessage(config::messages::shutdown::canceled);
} }
auto current = server::scheduledShutdown(); auto current = ts::server::scheduledShutdown();
current->active = false; current->active = false;
current->shutdownNotify.notify_all(); current->shutdownNotify.notify_all();
if(!threads::save_join(current->shutdown_thread)) { if(!threads::save_join(current->shutdown_thread)) {

View File

@ -154,7 +154,7 @@ bool VirtualServer::initialize(bool test_properties) {
if(default_channel->properties()[property::CHANNEL_FLAG_PASSWORD].as<bool>()) if(default_channel->properties()[property::CHANNEL_FLAG_PASSWORD].as<bool>())
default_channel->properties()[property::CHANNEL_FLAG_PASSWORD] = false; default_channel->properties()[property::CHANNEL_FLAG_PASSWORD] = false;
this->tokenManager = new server::tokens::TokenManager(this); this->tokenManager = new ts::server::tokens::TokenManager(this);
this->tokenManager->loadTokens(); this->tokenManager->loadTokens();
this->complains = new ComplainManager(this); this->complains = new ComplainManager(this);
@ -1108,7 +1108,7 @@ bool VirtualServer::resetPermissions(std::string& token) {
this->properties()[property::VIRTUALSERVER_DEFAULT_CHANNEL_GROUP] = this->getGroupManager()->findGroup(GroupTarget::GROUPTARGET_CHANNEL, default_channel_guest->name()).front()->groupId(); this->properties()[property::VIRTUALSERVER_DEFAULT_CHANNEL_GROUP] = this->getGroupManager()->findGroup(GroupTarget::GROUPTARGET_CHANNEL, default_channel_guest->name()).front()->groupId();
auto token_admin = this->getGroupManager()->findGroup(GroupTarget::GROUPTARGET_SERVER, default_server_admin->name()).front()->groupId(); auto token_admin = this->getGroupManager()->findGroup(GroupTarget::GROUPTARGET_SERVER, default_server_admin->name()).front()->groupId();
auto created = this->tokenManager->createToken(server::tokens::TOKEN_SERVER, token_admin, "Default server token for the server admin."); auto created = this->tokenManager->createToken(ts::server::tokens::TOKEN_SERVER, token_admin, "Default server token for the server admin.");
if(!created) { if(!created) {
logCritical(this->serverId, "Failed to generate default serveradmin token!"); logCritical(this->serverId, "Failed to generate default serveradmin token!");
} else { } else {

View File

@ -290,7 +290,7 @@ namespace ts {
std::shared_ptr<VoiceServer> udpVoiceServer = nullptr; std::shared_ptr<VoiceServer> udpVoiceServer = nullptr;
WebControlServer* webControlServer = nullptr; WebControlServer* webControlServer = nullptr;
server::tokens::TokenManager* tokenManager = nullptr; ts::server::tokens::TokenManager* tokenManager = nullptr;
ComplainManager* complains = nullptr; ComplainManager* complains = nullptr;
letter::LetterManager* letters = nullptr; letter::LetterManager* letters = nullptr;
std::shared_ptr<music::MusicBotManager> musicManager; std::shared_ptr<music::MusicBotManager> musicManager;

View File

@ -67,7 +67,7 @@ bool VirtualServerManager::initialize(bool autostart) {
this->state = State::STARTING; this->state = State::STARTING;
logMessage(LOG_INSTANCE, "Generating server puzzles..."); logMessage(LOG_INSTANCE, "Generating server puzzles...");
auto start = system_clock::now(); auto start = system_clock::now();
this->puzzles->precomputePuzzles(config::voice::DefaultPuzzlePrecomputeSize); this->puzzles->precompute_puzzles(config::voice::DefaultPuzzlePrecomputeSize);
logMessage(LOG_INSTANCE, "Puzzles generated! Time required: " + to_string(duration_cast<milliseconds>(system_clock::now() - start).count()) + "ms"); logMessage(LOG_INSTANCE, "Puzzles generated! Time required: " + to_string(duration_cast<milliseconds>(system_clock::now() - start).count()) + "ms");
size_t serverCount = 0; size_t serverCount = 0;

View File

@ -6,7 +6,7 @@ namespace ts {
namespace server { namespace server {
class InternalClient : public ConnectedClient { class InternalClient : public ConnectedClient {
public: public:
InternalClient(sql::SqlManager*, const std::shared_ptr<server::VirtualServer>&, std::string, bool); InternalClient(sql::SqlManager*, const std::shared_ptr<ts::server::VirtualServer>&, std::string, bool);
~InternalClient(); ~InternalClient();
void setSharedLock(const std::shared_ptr<ConnectedClient>& _this){ void setSharedLock(const std::shared_ptr<ConnectedClient>& _this){

View File

@ -101,7 +101,7 @@ bool MusicClient::disconnect(const std::string &reason) {
return true; return true;
} }
bool server::MusicClient::notifyClientMoved( bool MusicClient::notifyClientMoved(
const std::shared_ptr<ConnectedClient> &client, const std::shared_ptr<ConnectedClient> &client,
const std::shared_ptr<BasicChannel> &target_channel, const std::shared_ptr<BasicChannel> &target_channel,
ViewReasonId reason, ViewReasonId reason,

View File

@ -0,0 +1,15 @@
//
// Created by WolverinDEV on 10/03/2020.
//
#include "PacketDecoder.h"
#include <protocol/buffers.h>
#include <protocol/AcknowledgeManager.h>
#include <protocol/CompressionHandler.h>
#include <protocol/CryptHandler.cpp>
#include "../../ConnectionStatistics.h"
using namespace ts;
using namespace ts::server::server::udp;

View File

@ -0,0 +1,72 @@
#pragma once
#include <misc/spin_lock.h>
#include <mutex>
#include <deque>
#include <protocol/Packet.h>
#include <protocol/generation.h>
#include <protocol/ringbuffer.h>
namespace ts::connection {
class CryptHandler;
class CompressionHandler;
class AcknowledgeManager;
}
namespace ts::stats {
class ConnectionStatistics;
}
namespace ts::server::server::udp {
struct CommandFragment {
uint16_t packet_id{0};
uint16_t packet_generation{0};
uint8_t packet_flags{0};
uint32_t payload_length : 24;
pipes::buffer payload{};
CommandFragment() { this->payload_length = 0; }
CommandFragment(uint16_t packetId, uint16_t packetGeneration, uint8_t packetFlags, uint32_t payloadLength, pipes::buffer payload)
: packet_id{packetId}, packet_generation{packetGeneration}, packet_flags{packetFlags}, payload_length{payloadLength}, payload{std::move(payload)} {}
CommandFragment& operator=(const CommandFragment&) = default;
CommandFragment(const CommandFragment& other) = default;
CommandFragment(CommandFragment&&) = default;
};
static_assert(sizeof(CommandFragment) == 8 + sizeof(pipes::buffer));
class PacketDecoder {
typedef protocol::PacketRingBuffer<CommandFragment, 32, CommandFragment> command_fragment_buffer_t;
typedef std::array<command_fragment_buffer_t, 2> command_packet_reassembler;
public:
typedef std::function<void(const protocol::ClientPacketParser&)> callback_decoded_packet_t;
PacketDecoder(connection::CryptHandler* /* crypt handler */, connection::CompressionHandler* /* compress handler */, connection::AcknowledgeManager* /* acknowledge handler */);
~PacketDecoder();
void reset();
void decode_incoming_data(const pipes::buffer_view &/* buffer */);
[[nodiscard]] inline std::shared_ptr<stats::ConnectionStatistics> get_statistics() { return this->statistics_; }
inline void set_statistics(const std::shared_ptr<stats::ConnectionStatistics>& stats) { this->statistics_ = stats; }
[[nodiscard]] inline bool is_protocol_encrypted() const { return this->protocol_encrypted; }
void set_protocol_encrypted(bool flag) { this->protocol_encrypted = flag; }
callback_decoded_packet_t callback_decoded_packet{};
private:
bool protocol_encrypted{false};
std::shared_ptr<stats::ConnectionStatistics> statistics_{nullptr};
connection::CryptHandler* crypt_handler_{nullptr};
connection::CompressionHandler* compress_handler_{nullptr};
connection::AcknowledgeManager* acknowledge_handler_{nullptr};
std::array<protocol::generation_estimator, 9> incoming_generation_estimators{}; /* implementation is thread save */
std::recursive_mutex packet_buffer_lock;
command_packet_reassembler _command_fragment_buffers;
};
}

View File

@ -0,0 +1,230 @@
//
// Created by WolverinDEV on 09/03/2020.
//
#include "PacketEncoder.h"
#include <protocol/buffers.h>
#include <protocol/AcknowledgeManager.h>
#include <protocol/CompressionHandler.h>
#include <protocol/CryptHandler.cpp>
#include "../../ConnectionStatistics.h"
using namespace ts;
using namespace ts::server::server::udp;
PacketEncoder::PacketEncoder(ts::connection::CryptHandler *crypt_handler, ts::connection::CompressionHandler *compress_handler,
ts::connection::AcknowledgeManager *ack_handler) : crypt_handler_{crypt_handler}, compress_handler_{compress_handler}, acknowledge_handler_{ack_handler} {
memtrack::allocated<PacketEncoder>(this);
}
PacketEncoder::~PacketEncoder() {
memtrack::freed<PacketEncoder>(this);
this->reset();
}
void PacketEncoder::reset() {
for(auto& category : this->write_preprocess_queues) {
std::lock_guard work_lock{category.work_lock};
std::lock_guard queue_lock{category.queue_lock};
category.queue.clear();
}
this->id_generator.reset();
}
bool PacketEncoder::encode_packet(const std::shared_ptr<protocol::ServerPacket> &original_packet, EncodeFlags flags) {
std::shared_ptr<protocol::ServerPacket> packet;
if(flags & EncodeFlags::no_copy) {
packet = original_packet;
} else {
packet = protocol::ServerPacket::from_buffer(original_packet->buffer().dup(buffer::allocate_buffer(original_packet->buffer().length())));
if(original_packet->getListener())
packet->setListener(std::move(original_packet->getListener()));
packet->memory_state.flags = original_packet->memory_state.flags;
}
auto type = EncodeProcessCategory::from_type(packet->type().type());
auto& queue = this->write_preprocess_queues[type];
if(flags & EncodeFlags::sync) {
std::string error{};
std::vector<pipes::buffer> buffers{};
this->process_count++;
{
std::unique_lock work_lock{queue.work_lock};
auto encode_result = this->encode_packet_(error, buffers, packet, work_lock);
if(encode_result != PacketEncodeResult::SUCCESS) {
if(auto callback{this->callback_encode_failed}; callback)
callback(original_packet, encode_result, error);
goto sync_cleanup_exit;
}
}
if(auto callback{this->callback_encoded_buffers}; callback)
callback(buffers);
sync_cleanup_exit:
this->process_count--; /* we're now done preparing */
return false;
} else {
std::lock_guard queue_lock{queue.queue_lock};
queue.queue.push_back(packet);
queue.has_packets = true;
}
return true;
}
bool PacketEncoder::do_encode() {
std::string error{};
std::vector<pipes::buffer> buffers{};
std::shared_ptr<protocol::ServerPacket> packet{nullptr};
bool flag_more{false};
this->process_count++; /* we're preparing a packet */
for(auto& category : this->write_preprocess_queues) {
if(!category.has_packets) {
continue;
} else if(packet) {
flag_more = true;
break;
}
std::unique_lock work_lock{category.work_lock, std::try_to_lock};
if(!work_lock) continue; /* This particular category will already be processed */
{
std::lock_guard buffer_lock{category.queue_lock};
if(category.queue.empty()) {
category.has_packets = false;
continue;
}
packet = std::move(category.queue.front());
category.queue.pop_front();
category.has_packets = !category.queue.empty();
flag_more = category.has_packets;
}
if(auto errc = this->encode_packet_(error, buffers, packet, work_lock); errc != PacketEncodeResult::SUCCESS) {
if(auto callback{this->callback_encode_failed}; callback)
callback(packet, errc, error);
if(flag_more)
break;
else
continue; /* find out if we have more */
}
if(flag_more)
break;
else
continue; /* find out if we have more */
}
/* enqueue buffers for write */
if(!buffers.empty()) {
if(auto callback{this->callback_encoded_buffers}; callback)
callback(buffers);
}
this->process_count--; /* we're now done preparing */
return flag_more;
}
PacketEncodeResult PacketEncoder::encode_packet_(std::string& error,
std::vector<pipes::buffer> &result,
const std::shared_ptr<protocol::ServerPacket> &packet,
std::unique_lock<std::mutex> &work_lock) {
assert(work_lock.owns_lock());
if(packet->type().compressable() && !packet->memory_state.fragment_entry) {
packet->enable_flag(PacketFlag::Compressed);
if(!this->compress_handler_->progressPacketOut(&*packet, error))
return PacketEncodeResult::COMPRESS_FAILED;
}
std::vector<shared_ptr<ServerPacket>> fragments;
fragments.reserve((size_t) (packet->data().length() / packet->type().max_length()) + 1);
if(packet->data().length() > packet->type().max_length()) {
if(!packet->type().fragmentable())
return PacketEncodeResult::PACKET_TOO_LARGE;
{ //Split packets
auto buffer = packet->data();
const auto max_length = packet->type().max_length();
while(buffer.length() > max_length * 2) {
fragments.push_back(make_shared<ServerPacket>(packet->type(), buffer.view(0, max_length).dup(buffer::allocate_buffer(max_length))));
buffer = buffer.range((size_t) max_length);
}
if(buffer.length() > max_length) { //Divide rest by 2
fragments.push_back(make_shared<ServerPacket>(packet->type(), buffer.view(0, buffer.length() / 2).dup(buffer::allocate_buffer(buffer.length() / 2))));
buffer = buffer.range(buffer.length() / 2);
}
fragments.push_back(make_shared<ServerPacket>(packet->type(), buffer));
for(const auto& frag : fragments) {
frag->setFragmentedEntry(true);
frag->enable_flag(PacketFlag::NewProtocol);
}
}
assert(fragments.size() >= 2);
fragments.front()->enable_flag(PacketFlag::Fragmented);
if(packet->has_flag(PacketFlag::Compressed))
fragments.front()->enable_flag(PacketFlag::Compressed);
fragments.back()->enable_flag(PacketFlag::Fragmented);
if(packet->getListener())
fragments.back()->setListener(std::move(packet->getListener())); //Move the listener to the last :)
} else {
fragments.push_back(packet);
}
result.reserve(fragments.size());
/* apply packet ids */
for(const auto& fragment : fragments) {
if(!fragment->memory_state.id_branded)
fragment->applyPacketId(this->id_generator);
}
work_lock.unlock(); /* the rest could be unordered */
CryptHandler::key_t crypt_key{};
CryptHandler::nonce_t crypt_nonce{};
auto statistics = this->statistics_;
for(const auto& fragment : fragments) {
if(fragment->has_flag(PacketFlag::Unencrypted)) {
this->crypt_handler_->write_default_mac(fragment->mac().data_ptr());
} else {
if(this->protocol_encrypted) {
if(!this->crypt_handler_->generate_key_nonce(false, fragment->type().type(), fragment->packetId(), fragment->generationId(), crypt_key, crypt_nonce))
return PacketEncodeResult::ENCRYPT_KEY_GEN_FAILED;
} else {
crypt_key = CryptHandler::default_key;
crypt_nonce = CryptHandler::default_nonce;
}
auto crypt_result = this->crypt_handler_->encrypt(fragment->header().data_ptr(), fragment->header().length(),
fragment->data().data_ptr(), fragment->data().length(),
fragment->mac().data_ptr(),
crypt_key, crypt_nonce, error);
if(!crypt_result)
return PacketEncodeResult::ENCRYPT_FAILED;
}
if(statistics)
statistics->logOutgoingPacket(*fragment);
this->acknowledge_handler_->process_packet(*fragment);
result.push_back(fragment->buffer());
}
return PacketEncodeResult::SUCCESS;
}

View File

@ -0,0 +1,116 @@
#pragma once
#include <misc/spin_lock.h>
#include <mutex>
#include <deque>
#include <protocol/Packet.h>
namespace ts::connection {
class CryptHandler;
class CompressionHandler;
class AcknowledgeManager;
}
namespace ts::stats {
class ConnectionStatistics;
}
namespace ts::server::server::udp {
struct EncodeProcessCategory {
enum value {
PING_PONG = 0, //Ping/Pongs
ACK = 2,
VOICE_WHISPER = 1, //Voice/Whisper
COMMAND = 3,
INIT = 4,
MAX = INIT
};
inline static value from_type(protocol::PacketType type) {
switch(type) {
case protocol::PING:
case protocol::PONG:
return value::PING_PONG;
case protocol::VOICE:
case protocol::VOICE_WHISPER:
return value::VOICE_WHISPER;
case protocol::ACK:
case protocol::ACK_LOW:
return value::ACK;
case protocol::COMMAND:
case protocol::COMMAND_LOW:
return value::COMMAND;
default:
return value::INIT;
}
}
};
enum struct PacketEncodeResult {
SUCCESS,
COMPRESS_FAILED, /* has custom message */
PACKET_TOO_LARGE,
ENCRYPT_KEY_GEN_FAILED,
ENCRYPT_FAILED, /* has custom message */
};
class PacketEncoder {
public:
enum EncodeFlags {
none = 0x0,
no_copy = 0x1, /* do not copy the packet */
sync = 0x02 /* directly process the packet */
};
typedef std::function<void(const std::vector<pipes::buffer>& /* buffers */)> callback_encoded_buffers_t;
typedef std::function<void(const std::shared_ptr<protocol::ServerPacket> &/* the packet */, PacketEncodeResult& /* error */, std::string& /* custom message */)> callback_encode_failed_t;
PacketEncoder(connection::CryptHandler* /* crypt handler */, connection::CompressionHandler* /* compress handler */, connection::AcknowledgeManager* /* acknowledge handler */);
~PacketEncoder();
void reset();
/* returns true if the encoder has something to encode */
bool encode_packet(const std::shared_ptr<protocol::ServerPacket> &/* the packet */, EncodeFlags /* flags */);
bool do_encode();
[[nodiscard]] inline std::shared_ptr<stats::ConnectionStatistics> get_statistics() { return this->statistics_; }
inline void set_statistics(const std::shared_ptr<stats::ConnectionStatistics>& stats) { this->statistics_ = stats; }
[[nodiscard]] inline bool is_protocol_encrypted() const { return this->protocol_encrypted; }
void set_protocol_encrypted(bool flag) { this->protocol_encrypted = flag; }
callback_encoded_buffers_t callback_encoded_buffers{};
callback_encode_failed_t callback_encode_failed{};
private:
bool protocol_encrypted{false};
std::shared_ptr<stats::ConnectionStatistics> statistics_{nullptr};
connection::CryptHandler* crypt_handler_{nullptr};
connection::CompressionHandler* compress_handler_{nullptr};
connection::AcknowledgeManager* acknowledge_handler_{nullptr};
struct PacketEncodeQueue {
bool has_packets{false};
std::mutex work_lock{};
spin_lock queue_lock{};
std::deque<std::shared_ptr<protocol::ServerPacket>> queue{};
};
std::array<PacketEncodeQueue, EncodeProcessCategory::MAX> write_preprocess_queues{};
/* ---------- Processing ---------- */
/* automatically locked because packets of the same kind should be lock their "work_lock" from their WritePreprocessQueue object */
protocol::PacketIdManager id_generator{};
std::atomic<size_t> process_count{0};
PacketEncodeResult encode_packet_(std::string& /* error */, std::vector<pipes::buffer> &result/* buffers which need to be transferred */, const std::shared_ptr<protocol::ServerPacket> &packet/* the packet */, std::unique_lock<std::mutex> &work_lock /* work lock */);
};
}

View File

@ -61,7 +61,7 @@ void VoiceClient::sendCommand0(const std::string_view& cmd, bool low, bool direc
packet->enable_flag(protocol::PacketFlag::NewProtocol); packet->enable_flag(protocol::PacketFlag::NewProtocol);
} }
packet->setListener(std::move(listener)); packet->setListener(std::move(listener));
this->connection->sendPacket(packet, false, direct); this->connection->send_packet(packet, false, direct);
#ifdef PKT_LOG_CMD #ifdef PKT_LOG_CMD
logTrace(this->getServerId(), "{}[Command][Server -> Client] Sending command {}. Command low: {}. Full command: {}", CLIENT_STR_LOG_PREFIX, cmd.substr(0, cmd.find(' ')), low, cmd); logTrace(this->getServerId(), "{}[Command][Server -> Client] Sending command {}. Command low: {}. Full command: {}", CLIENT_STR_LOG_PREFIX, cmd.substr(0, cmd.find(' ')), low, cmd);
@ -74,7 +74,7 @@ void VoiceClient::sendAcknowledge(uint16_t packetId, bool low) {
auto packet = make_shared<protocol::ServerPacket>(low ? protocol::PacketTypeInfo::AckLow : protocol::PacketTypeInfo::Ack, pipes::buffer_view{buffer, 2}); auto packet = make_shared<protocol::ServerPacket>(low ? protocol::PacketTypeInfo::AckLow : protocol::PacketTypeInfo::Ack, pipes::buffer_view{buffer, 2});
packet->enable_flag(PacketFlag::Unencrypted); packet->enable_flag(PacketFlag::Unencrypted);
if(!low) packet->enable_flag(protocol::PacketFlag::NewProtocol); if(!low) packet->enable_flag(protocol::PacketFlag::NewProtocol);
this->connection->sendPacket(packet); this->connection->send_packet(packet);
#ifdef PKT_LOG_ACK #ifdef PKT_LOG_ACK
logTrace(this->getServerId(), "{}[Acknowledge][Server -> Client] Sending acknowledge for {}", CLIENT_STR_LOG_PREFIX, packetId); logTrace(this->getServerId(), "{}[Acknowledge][Server -> Client] Sending acknowledge for {}", CLIENT_STR_LOG_PREFIX, packetId);
#endif #endif
@ -293,7 +293,7 @@ void VoiceClient::send_voice_packet(const pipes::buffer_view &voice_buffer, cons
} }
memcpy(packet->data().data_ptr<void>(), voice_buffer.data_ptr<void>(), voice_buffer.length()); memcpy(packet->data().data_ptr<void>(), voice_buffer.data_ptr<void>(), voice_buffer.length());
this->connection->sendPacket(packet, false, false); this->connection->send_packet(packet, false, false);
} }
void VoiceClient::send_voice_whisper_packet(const pipes::buffer_view &voice_buffer, const SpeakingClient::VoicePacketFlags &flags) { void VoiceClient::send_voice_whisper_packet(const pipes::buffer_view &voice_buffer, const SpeakingClient::VoicePacketFlags &flags) {
@ -308,5 +308,5 @@ void VoiceClient::send_voice_whisper_packet(const pipes::buffer_view &voice_buff
} }
memcpy(packet->data().data_ptr<void>(), voice_buffer.data_ptr<void>(), voice_buffer.length()); memcpy(packet->data().data_ptr<void>(), voice_buffer.data_ptr<void>(), voice_buffer.length());
this->connection->sendPacket(packet, false, false); this->connection->send_packet(packet, false, false);
} }

View File

@ -29,9 +29,17 @@ using namespace ts::connection;
using namespace ts::protocol; using namespace ts::protocol;
using namespace ts::server; using namespace ts::server;
VoiceClientConnection::VoiceClientConnection(VoiceClient* client) : client(client) { VoiceClientConnection::VoiceClientConnection(VoiceClient* client) :
packet_encoder_{&this->crypt_handler, &this->compress_handler, &this->acknowledge_handler},
packet_decoder_{&this->crypt_handler, &this->compress_handler, &this->acknowledge_handler} {
memtrack::allocated<VoiceClientConnection>(this); memtrack::allocated<VoiceClientConnection>(this);
this->packet_encoder_.callback_encoded_buffers = std::bind(&VoiceClientConnection::handle_encoded_buffers, this, std::placeholders::_1);
this->packet_encoder_.callback_encode_failed = std::bind(&VoiceClientConnection::handle_encode_error, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);
this->server_id = client->getServerId();
this->client_handle = client;
this->crypt_handler.reset(); this->crypt_handler.reset();
debugMessage(client->getServer()->getServerId(), "Allocated new voice client connection at {}", (void*) this); debugMessage(client->getServer()->getServerId(), "Allocated new voice client connection at {}", (void*) this);
} }
@ -43,19 +51,16 @@ VoiceClientConnection::~VoiceClientConnection() {
this->write_queue.clear(); this->write_queue.clear();
} }
for(auto& category : this->write_preprocess_queues) { this->client_handle = nullptr;
lock_guard work_lock{category.work_lock};
lock_guard queue_lock{category.queue_lock};
category.queue.clear();
}
this->client = nullptr;
memtrack::freed<VoiceClientConnection>(this); memtrack::freed<VoiceClientConnection>(this);
} }
void VoiceClientConnection::triggerWrite() { void VoiceClientConnection::register_client_for_write() {
if(this->client->voice_server) std::shared_lock client_lock{this->client_mutex};
this->client->voice_server->triggerWrite(dynamic_pointer_cast<VoiceClient>(this->client->_this.lock())); if(!this->client_handle) return;
if(this->client_handle->voice_server)
this->client_handle->voice_server->triggerWrite(dynamic_pointer_cast<VoiceClient>(this->client_handle->_this.lock()));
} }
#ifdef CLIENT_LOG_PREFIX #ifdef CLIENT_LOG_PREFIX
@ -372,50 +377,23 @@ bool VoiceClientConnection::next_reassembled_command(unique_lock<std::recursive_
} }
void VoiceClientConnection::sendPacket(const shared_ptr<protocol::ServerPacket>& original_packet, bool copy, bool prepare_directly) { void VoiceClientConnection::send_packet(const shared_ptr<protocol::ServerPacket>& original_packet, bool copy, bool prepare_directly) {
if(this->client->state == ConnectionState::DISCONNECTED) if(this->client->state == ConnectionState::DISCONNECTED)
return; return;
shared_ptr<protocol::ServerPacket> packet; using EncodeFlags = server::server::udp::PacketEncoder::EncodeFlags;
if(copy) { int flags{EncodeFlags::none};
packet = protocol::ServerPacket::from_buffer(original_packet->buffer().dup(buffer::allocate_buffer(original_packet->buffer().length()))); if(!copy)
if(original_packet->getListener()) flags |= EncodeFlags::no_copy;
packet->setListener(std::move(original_packet->getListener())); if(prepare_directly)
packet->memory_state.flags = original_packet->memory_state.flags; flags |= EncodeFlags::sync;
} else {
packet = original_packet;
}
auto type = WritePreprocessCategory::from_type(packet->type().type()); if(this->packet_encoder_.encode_packet(original_packet, (EncodeFlags) flags))
auto& queue = this->write_preprocess_queues[type]; this->register_client_for_write();
if(prepare_directly) {
vector<pipes::buffer> buffers;
this->prepare_process_count++;
{
unique_lock work_lock{queue.work_lock};
if(!this->prepare_packet_for_write(buffers, packet, work_lock)) {
logError(this->client->getServerId(), "{} Dropping packet!", CLIENT_STR_LOG_PREFIX_(this->client));
this->prepare_process_count--;
return;
}
}
/* enqueue buffers for write */
{
lock_guard write_queue_lock(this->write_queue_lock);
this->write_queue.insert(this->write_queue.end(), buffers.begin(), buffers.end());
}
this->prepare_process_count--; /* we're now done preparing */
} else {
lock_guard queue_lock{queue.queue_lock};
queue.queue.push_back(packet);
queue.has_work = true;
}
this->triggerWrite();
} }
bool VoiceClientConnection::prepare_packet_for_write(vector<pipes::buffer> &result, const shared_ptr<ServerPacket> &packet, std::unique_lock<std::mutex>& work_lock) { #if 0
bool VoiceClientConnection::encode_packet(vector<pipes::buffer> &result, const shared_ptr<ServerPacket> &packet, std::unique_lock<std::mutex>& work_lock) {
assert(work_lock.owns_lock()); assert(work_lock.owns_lock());
string error = "success"; string error = "success";
@ -518,68 +496,50 @@ bool VoiceClientConnection::prepare_packet_for_write(vector<pipes::buffer> &resu
return true; return true;
} }
#endif
bool VoiceClientConnection::preprocess_write_packets() { void VoiceClientConnection::handle_encode_error(const shared_ptr<protocol::ServerPacket> &packet,
std::shared_ptr<ServerPacket> packet{nullptr}; ts::server::server::udp::PacketEncodeResult &result, std::string &message) {
vector<pipes::buffer> buffers{}; using PacketEncodeResult = ts::server::server::udp::PacketEncodeResult;
bool flag_more{false}; switch (result) {
case PacketEncodeResult::PACKET_TOO_LARGE:
logWarning(this->server_id, "{} Dropping packet of type {}. Packet is too large ({}bytes).", this->client_log_prefix(), packet->type().name(), packet->length());
break;
prepare_process_count++; /* we're not preparing a packet */ case PacketEncodeResult::COMPRESS_FAILED:
for(auto& category : this->write_preprocess_queues) { logWarning(this->server_id, "{} Dropping packet of type {}. Failed to compress packet ({}).", this->client_log_prefix(), packet->type().name(), message);
if(!category.has_work) continue; break;
else if(packet) {
flag_more = true; case PacketEncodeResult::ENCRYPT_KEY_GEN_FAILED:
logWarning(this->server_id, "{} Dropping packet of type {}. Failed to generate crypto key for packet.", this->client_log_prefix(), packet->type().name());
break;
case PacketEncodeResult::ENCRYPT_FAILED:
logWarning(this->server_id, "{} Dropping packet of type {}. Failed to encrypt packet ({}).", this->client_log_prefix(), packet->type().name(), message);
break;
case PacketEncodeResult::SUCCESS:
break; break;
} }
unique_lock work_lock{category.work_lock, try_to_lock};
if(!work_lock) continue; /* This particular category will already be processed */
{
lock_guard buffer_lock{category.queue_lock};
if(category.queue.empty()) {
category.has_work = false;
continue;
}
packet = std::move(category.queue.front());
category.queue.pop_front();
category.has_work = !category.queue.empty();
flag_more = category.has_work;
}
if(!this->prepare_packet_for_write(buffers, packet, work_lock)) {
logError(this->client->getServerId(), "{} Dropping packet!", CLIENT_STR_LOG_PREFIX_(this->client));
if(flag_more)
break;
else
continue; /* find out if we have more */
}
if(flag_more)
break;
else
continue; /* find out if we have more */
}
/* enqueue buffers for write */
if(!buffers.empty()) {
lock_guard write_queue_lock(this->write_queue_lock);
this->write_queue.insert(this->write_queue.end(), buffers.begin(), buffers.end());
}
this->prepare_process_count--; /* we're now done preparing */
return flag_more;
} }
int VoiceClientConnection::pop_write_buffer(pipes::buffer& target) { void VoiceClientConnection::handle_encoded_buffers(const std::vector<pipes::buffer> &buffers) {
if(this->client->state == DISCONNECTED) {
return 2; std::lock_guard lock{this->write_queue_lock};
this->write_queue.insert(this->write_queue.begin(), buffers.begin(), buffers.end());
}
this->register_client_for_write();
}
lock_guard write_queue_lock(this->write_queue_lock); bool VoiceClientConnection::encode_packets() {
return this->packet_encoder_.do_encode();
}
WriteBufferStatus VoiceClientConnection::pop_write_buffer(pipes::buffer& target) {
lock_guard wqlock(this->write_queue_lock);
size_t size = this->write_queue.size(); size_t size = this->write_queue.size();
if(size == 0) if(size == 0)
return 2; return WriteBufferStatus::EMPTY;
target = std::move(this->write_queue.front()); target = std::move(this->write_queue.front());
this->write_queue.pop_front(); this->write_queue.pop_front();
@ -596,11 +556,12 @@ int VoiceClientConnection::pop_write_buffer(pipes::buffer& target) {
} }
#endif #endif
#endif #endif
return size > 1; return size > 1 ? WriteBufferStatus::BUFFERS_LEFT : WriteBufferStatus::EMPTY;
} }
bool VoiceClientConnection::wait_empty_write_and_prepare_queue(chrono::time_point<chrono::system_clock> until) { bool VoiceClientConnection::wait_empty_write_and_prepare_queue(chrono::time_point<chrono::system_clock> until) {
while(true) { while(true) {
#if 0
for(auto& queue : this->write_preprocess_queues) { for(auto& queue : this->write_preprocess_queues) {
{ {
lock_guard lock{queue.queue_lock}; lock_guard lock{queue.queue_lock};
@ -621,6 +582,7 @@ bool VoiceClientConnection::wait_empty_write_and_prepare_queue(chrono::time_poin
if(this->prepare_process_count != 0) if(this->prepare_process_count != 0)
goto _wait; goto _wait;
} }
#endif
break; break;
_wait: _wait:
@ -633,16 +595,11 @@ bool VoiceClientConnection::wait_empty_write_and_prepare_queue(chrono::time_poin
} }
void VoiceClientConnection::reset() { void VoiceClientConnection::reset() {
for(auto& queue : this->write_preprocess_queues) { this->packet_encoder_.reset();
{
lock_guard lock{queue.queue_lock};
queue.queue.clear();
}
}
this->acknowledge_handler.reset(); this->acknowledge_handler.reset();
this->crypt_handler.reset(); this->crypt_handler.reset();
this->packet_id_manager.reset();
{ {
lock_guard buffer_lock(this->packet_buffer_lock); lock_guard buffer_lock(this->packet_buffer_lock);

View File

@ -15,6 +15,8 @@
#include "VoiceClient.h" #include "VoiceClient.h"
#include "protocol/AcknowledgeManager.h" #include "protocol/AcknowledgeManager.h"
#include <protocol/generation.h> #include <protocol/generation.h>
#include "./PacketEncoder.h"
#include "./PacketDecoder.h"
//#define LOG_ACK_SYSTEM //#define LOG_ACK_SYSTEM
#ifdef LOG_ACK_SYSTEM #ifdef LOG_ACK_SYSTEM
@ -28,16 +30,9 @@ namespace ts {
namespace server { namespace server {
class VoiceClient; class VoiceClient;
class VoiceServer; class VoiceServer;
class POWHandler;
} }
namespace connection { namespace connection {
class VoiceClientConnection {
friend class AcknowledgeManager;
friend class server::VoiceServer;
friend class server::VoiceClient;
friend class server::POWHandler;
public:
struct CommandFragment { struct CommandFragment {
uint16_t packet_id{0}; uint16_t packet_id{0};
uint16_t packet_generation{0}; uint16_t packet_generation{0};
@ -47,8 +42,8 @@ namespace ts {
pipes::buffer payload{}; pipes::buffer payload{};
CommandFragment() { this->payload_length = 0; } CommandFragment() { this->payload_length = 0; }
CommandFragment(uint16_t packetId, uint16_t packetGeneration, uint8_t packetFlags, uint32_t payloadLength, pipes::buffer payload) : packet_id{packetId}, packet_generation{packetGeneration}, packet_flags{packetFlags}, CommandFragment(uint16_t packetId, uint16_t packetGeneration, uint8_t packetFlags, uint32_t payloadLength, pipes::buffer payload)
payload_length{payloadLength}, payload{std::move(payload)} {} : packet_id{packetId}, packet_generation{packetGeneration}, packet_flags{packetFlags}, payload_length{payloadLength}, payload{std::move(payload)} {}
CommandFragment& operator=(const CommandFragment&) = default; CommandFragment& operator=(const CommandFragment&) = default;
CommandFragment(const CommandFragment& other) = default; CommandFragment(const CommandFragment& other) = default;
@ -56,49 +51,58 @@ namespace ts {
}; };
static_assert(sizeof(CommandFragment) == 8 + sizeof(pipes::buffer)); static_assert(sizeof(CommandFragment) == 8 + sizeof(pipes::buffer));
enum struct WriteBufferStatus {
EMPTY,
BUFFERS_LEFT,
NO_CHANGES,
UNSET
};
class VoiceClientConnection {
friend class server::VoiceServer;
friend class server::VoiceClient;
public:
typedef protocol::PacketRingBuffer<CommandFragment, 32, CommandFragment> command_fragment_buffer_t; typedef protocol::PacketRingBuffer<CommandFragment, 32, CommandFragment> command_fragment_buffer_t;
typedef std::array<command_fragment_buffer_t, 2> command_packet_reassembler; typedef std::array<command_fragment_buffer_t, 2> command_packet_reassembler;
explicit VoiceClientConnection(server::VoiceClient*); explicit VoiceClientConnection(server::VoiceClient*);
virtual ~VoiceClientConnection(); virtual ~VoiceClientConnection();
void sendPacket(const std::shared_ptr<protocol::ServerPacket>& original_packet, bool copy = false, bool prepare_directly = false); [[nodiscard]] inline CryptHandler* getCryptHandler(){ return &crypt_handler; }
//[[nodiscard]] inline server::VoiceClient* getClient(){ return client; }
CryptHandler* getCryptHandler(){ return &crypt_handler; } void send_packet(const std::shared_ptr<protocol::ServerPacket>& original_packet, bool copy = false, bool prepare_directly = false);
server::VoiceClient* getClient(){ return client; }
#ifdef VC_USE_READ_QUEUE
bool handleNextDatagram();
#endif
/* /*
* Split packets waiting in write_process_queue and moves the final buffers to writeQueue. * Split packets waiting in write_process_queue and moves the final buffers to writeQueue.
* @returns true when there are more packets to prepare * @returns true when there are more packets to prepare
*/ */
bool preprocess_write_packets(); bool encode_packets();
/* return 2 => Nothing | 1 => More and buffer is set | 0 => Buffer is set, nothing more */ /* return 2 => Nothing | 1 => More and buffer is set | 0 => Buffer is set, nothing more */
int pop_write_buffer(pipes::buffer& /* buffer */); [[nodiscard]] WriteBufferStatus pop_write_buffer(pipes::buffer& /* buffer */);
bool wait_empty_write_and_prepare_queue(std::chrono::time_point<std::chrono::system_clock> until = std::chrono::time_point<std::chrono::system_clock>()); bool wait_empty_write_and_prepare_queue(std::chrono::time_point<std::chrono::system_clock> until = std::chrono::time_point<std::chrono::system_clock>());
protocol::PacketIdManager& getPacketIdManager() { return this->packet_id_manager; }
void reset(); void reset();
void force_insert_command(const pipes::buffer_view& /* payload */); void force_insert_command(const pipes::buffer_view& /* payload */);
void register_initiv_packet(); void register_initiv_packet();
//buffer::SortedBufferQueue<protocol::ClientPacket>** getReadQueue() { return this->readTypedQueue; }
protected: protected:
void handle_incoming_datagram(const pipes::buffer_view &buffer); void handle_incoming_datagram(const pipes::buffer_view &buffer);
bool verify_encryption(const pipes::buffer_view& /* full packet */); bool verify_encryption(const pipes::buffer_view& /* full packet */);
void triggerWrite(); void register_client_for_write();
private: private:
server::VoiceClient* client = nullptr; VirtualServerId server_id{0};
std::shared_mutex client_mutex{};
server::VoiceClient* client_handle{nullptr};
//Decryption / encryption stuff CryptHandler crypt_handler{};
CryptHandler crypt_handler; /* access to CryptHandler is thread save */ CompressionHandler compress_handler{};
CompressionHandler compress_handler; AcknowledgeManager acknowledge_handler{};
AcknowledgeManager acknowledge_handler;
//Handle stuff //Handle stuff
void execute_handle_command_packets(const std::chrono::system_clock::time_point& /* scheduled */); void execute_handle_command_packets(const std::chrono::system_clock::time_point& /* scheduled */);
@ -109,69 +113,21 @@ namespace ts {
spin_lock write_queue_lock; /* queue access isn't for long in general */ spin_lock write_queue_lock; /* queue access isn't for long in general */
std::deque<pipes::buffer> write_queue; std::deque<pipes::buffer> write_queue;
struct WritePreprocessCategory { server::server::udp::PacketEncoder packet_encoder_;
enum value { server::server::udp::PacketDecoder packet_decoder_;
PING_PONG = 0, //Ping/Pongs /* will be called on the IO thread or if sync has been set directly in any thread */
ACK = 2, void handle_encode_error(const std::shared_ptr<protocol::ServerPacket> &/* the packet */, ts::server::server::udp::PacketEncodeResult& /* error */, std::string& /* custom message */);
VOICE_WHISPER = 1, //Voice/Whisper void handle_encoded_buffers(const std::vector<pipes::buffer>& /* buffers */);
COMMAND = 3,
INIT = 4,
MAX = INIT /* will be called on the IO thread */
}; void handle_decoded_packet(const protocol::ClientPacketParser&);
void handle_decode_error();
inline static value from_type(protocol::PacketType type) {
switch(type) {
case protocol::PING:
case protocol::PONG:
return value::PING_PONG;
case protocol::VOICE:
case protocol::VOICE_WHISPER:
return value::VOICE_WHISPER;
case protocol::ACK:
case protocol::ACK_LOW:
return value::ACK;
case protocol::COMMAND:
case protocol::COMMAND_LOW:
return value::COMMAND;
default:
return value::INIT;
}
}
};
struct WritePreprocessQueue {
int _zero1{0};
bool has_work{false};
std::mutex work_lock{};
spin_lock queue_lock{};
std::deque<std::shared_ptr<protocol::ServerPacket>> queue{};
int _zero{0};
};
std::array<WritePreprocessQueue, WritePreprocessCategory::MAX> write_preprocess_queues{};
/* ---------- Processing ---------- */
/* automatically locked because packets of the same kind should be lock their "work_lock" from their WritePreprocessQueue object */
protocol::PacketIdManager packet_id_manager;
/* this function is thread save :) */
std::atomic<uint8_t> prepare_process_count{0}; /* current thread count preparing a packet */
bool prepare_packet_for_write(std::vector<pipes::buffer> &/* buffers which need to be transferred */, const std::shared_ptr<protocol::ServerPacket> &/* the packet */, std::unique_lock<std::mutex>& /* work lock */);
std::array<protocol::generation_estimator, 9> incoming_generation_estimators{}; /* implementation is thread save */
std::recursive_mutex packet_buffer_lock;
command_packet_reassembler _command_fragment_buffers;
static inline uint8_t command_fragment_buffer_index(uint8_t packet_index) { static inline uint8_t command_fragment_buffer_index(uint8_t packet_index) {
return packet_index & 0x1U; /* use 0 for command and 1 for command low */ return packet_index & 0x1U; /* use 0 for command and 1 for command low */
} }
[[nodiscard]] std::string client_log_prefix();
}; };
} }
} }

View File

@ -69,7 +69,7 @@ void VoiceClient::handlePacketPing(const protocol::ClientPacketParser& packet) {
le2be16(packet.packet_id(), buffer); le2be16(packet.packet_id(), buffer);
auto pkt = make_shared<ServerPacket>(PacketTypeInfo::Pong, pipes::buffer_view{buffer, 2}); auto pkt = make_shared<ServerPacket>(PacketTypeInfo::Pong, pipes::buffer_view{buffer, 2});
pkt->enable_flag(PacketFlag::Unencrypted); pkt->enable_flag(PacketFlag::Unencrypted);
this->connection->sendPacket(pkt); this->connection->send_packet(pkt);
} }
void VoiceClient::handlePacketVoice(const protocol::ClientPacketParser& packet) { void VoiceClient::handlePacketVoice(const protocol::ClientPacketParser& packet) {

View File

@ -14,7 +14,7 @@ void VoiceClient::sendPingRequest() {
auto packet = make_shared<ServerPacket>(PacketTypeInfo::Ping, pipes::buffer_view{}); auto packet = make_shared<ServerPacket>(PacketTypeInfo::Ping, pipes::buffer_view{});
packet->enable_flag(PacketFlag::Unencrypted); packet->enable_flag(PacketFlag::Unencrypted);
this->connection->sendPacket(packet, false, true); /* prepare directly so the packet gets a packet id */ this->connection->send_packet(packet, false, true); /* prepare directly so the packet gets a packet id */
this->lastPingId = packet->packetId(); this->lastPingId = packet->packetId();

View File

@ -84,7 +84,7 @@ namespace ts::server {
/* Use channel group id 0 to delete any assignment */ /* Use channel group id 0 to delete any assignment */
GroupAssignmentResult set_channel_group(ClientDbId /* client database id */, GroupId /* group id */, ChannelId /* channel id */, bool /* temporary assignment */); GroupAssignmentResult set_channel_group(ClientDbId /* client database id */, GroupId /* group id */, ChannelId /* channel id */, bool /* temporary assignment */);
std::deque<property::ClientProperties> update_client_group_properties(const std::shared_ptr<server::ConnectedClient> &client, ChannelId /* target channel */); std::deque<property::ClientProperties> update_client_group_properties(const std::shared_ptr<ConnectedClient> &client, ChannelId /* target channel */);
void cleanup_assignments(); void cleanup_assignments();
void cleanup_channel_assignments(ChannelId /* channel */); void cleanup_channel_assignments(ChannelId /* channel */);

View File

@ -29,7 +29,7 @@ namespace ts {
class TokenManager { class TokenManager {
public: public:
TokenManager(server::VirtualServer*); explicit TokenManager(::ts::server::VirtualServer*);
~TokenManager(); ~TokenManager();
bool loadTokens(); bool loadTokens();
@ -39,7 +39,7 @@ namespace ts {
bool deleteToke(const std::string&); bool deleteToke(const std::string&);
private: private:
int loadTokenFromDb(int length, char** values, char** columns); int loadTokenFromDb(int length, char** values, char** columns);
server::VirtualServer* handle; ts::server::VirtualServer* handle;
std::vector<std::shared_ptr<TokenEntry>> tokens; std::vector<std::shared_ptr<TokenEntry>> tokens;
}; };
} }

View File

@ -47,19 +47,19 @@ void MusicBotManager::cleanup_client_bots(ts::ClientDbId clientid) {
this->deleteBot(bot); this->deleteBot(bot);
} }
std::deque<std::shared_ptr<server::MusicClient>> MusicBotManager::available_bots() { std::deque<std::shared_ptr<MusicClient>> MusicBotManager::available_bots() {
lock_guard lock(music_bots_lock); lock_guard lock(music_bots_lock);
return this->music_bots; return this->music_bots;
} }
std::shared_ptr<server::MusicClient> MusicBotManager::find_bot_by_playlist(const std::shared_ptr<ts::music::PlayablePlaylist> &playlist) { std::shared_ptr<MusicClient> MusicBotManager::find_bot_by_playlist(const std::shared_ptr<ts::music::PlayablePlaylist> &playlist) {
for(const auto& bot : this->available_bots()) for(const auto& bot : this->available_bots())
if(bot->playlist() == playlist) if(bot->playlist() == playlist)
return bot; return bot;
return nullptr; return nullptr;
} }
std::deque<std::shared_ptr<server::MusicClient>> MusicBotManager::listBots(ClientDbId clid) { std::deque<std::shared_ptr<MusicClient>> MusicBotManager::listBots(ClientDbId clid) {
lock_guard lock(music_bots_lock); lock_guard lock(music_bots_lock);
std::deque<std::shared_ptr<server::MusicClient>> res; std::deque<std::shared_ptr<server::MusicClient>> res;
for(const auto& bot : this->music_bots) for(const auto& bot : this->music_bots)
@ -67,7 +67,7 @@ std::deque<std::shared_ptr<server::MusicClient>> MusicBotManager::listBots(Clien
return res; return res;
} }
std::shared_ptr<server::MusicClient> MusicBotManager::createBot(ClientDbId owner) { std::shared_ptr<MusicClient> MusicBotManager::createBot(ClientDbId owner) {
if(!config::license->isPremium()) { if(!config::license->isPremium()) {
if(this->current_bot_count() >= this->max_bots()) return nullptr; //Test the license if(this->current_bot_count() >= this->max_bots()) return nullptr; //Test the license
} }
@ -177,7 +177,7 @@ int MusicBotManager::current_bot_count() {
return this->music_bots.size(); return this->music_bots.size();
} }
std::shared_ptr<server::MusicClient> MusicBotManager::findBotById(ClientDbId id) { std::shared_ptr<MusicClient> MusicBotManager::findBotById(ClientDbId id) {
lock_guard lock(music_bots_lock); lock_guard lock(music_bots_lock);
for(const auto& bot : this->music_bots) for(const auto& bot : this->music_bots)
if(bot->getClientDatabaseId() == id) return bot; if(bot->getClientDatabaseId() == id) return bot;

View File

@ -3,6 +3,7 @@
#include <deque> #include <deque>
#include <event.h> #include <event.h>
#include <ThreadPool/Thread.h> #include <ThreadPool/Thread.h>
#include <ThreadPool/Mutex.h>
#include <condition_variable> #include <condition_variable>
#include <pipes/buffer.h> #include <pipes/buffer.h>
#include <misc/spin_lock.h> #include <misc/spin_lock.h>

View File

@ -28,7 +28,7 @@ extern InstanceHandler* serverInstance;
VoiceServer::VoiceServer(const std::shared_ptr<VirtualServer>& server) { VoiceServer::VoiceServer(const std::shared_ptr<VirtualServer>& server) {
this->server = server; this->server = server;
this->pow_handler = make_unique<POWHandler>(this); this->pow_handler = make_unique<server::udp::POWHandler>(this);
} }
VoiceServer::~VoiceServer() { } VoiceServer::~VoiceServer() { }
@ -169,7 +169,7 @@ void VoiceServer::execute_resend(const std::chrono::system_clock::time_point &no
} }
//logTrace(client->getServerId(), "{} Resending {} packets.", CLIENT_STR_LOG_PREFIX_(client), buffers.size()); //logTrace(client->getServerId(), "{} Resending {} packets.", CLIENT_STR_LOG_PREFIX_(client), buffers.size());
buffers.clear(); buffers.clear();
connection->triggerWrite(); connection->register_client_for_write();
} }
} }
} }
@ -447,7 +447,7 @@ void VoiceServer::handleMessageWrite(int fd, short events, void *_event_handle)
auto voice_server = event_handle->voice_server; auto voice_server = event_handle->voice_server;
bool retrigger = false; bool retrigger = false;
int buffer_state; connection::WriteBufferStatus buffer_state{connection::WriteBufferStatus::UNSET};
IOData<0x100> io{}; IOData<0x100> io{};
io.file_descriptor = fd; io.file_descriptor = fd;
@ -472,15 +472,15 @@ void VoiceServer::handleMessageWrite(int fd, short events, void *_event_handle)
auto client_ptr = &*client; auto client_ptr = &*client;
TIMING_STEP(timings, "client get"); TIMING_STEP(timings, "client get");
more_to_prepare = connection->preprocess_write_packets(); more_to_prepare = connection->encode_packets();
TIMING_STEP(timings, "client prepare"); TIMING_STEP(timings, "client prepare");
while(system_clock::now() <= write_timeout) { while(system_clock::now() <= write_timeout) {
buffer_state = connection->pop_write_buffer(buffer); buffer_state = connection->pop_write_buffer(buffer);
more_to_write = buffer_state == 1; more_to_write = buffer_state == connection::WriteBufferStatus::BUFFERS_LEFT;
TIMING_STEP(timings, "buffer pop"); TIMING_STEP(timings, "buffer pop");
if(buffer_state != 2) { if(buffer_state != connection::WriteBufferStatus::NO_CHANGES) {
ssize_t res = write_datagram(io, client_ptr->remote_address, &client_ptr->address_info, buffer.length(), buffer.data_ptr()); ssize_t res = write_datagram(io, client_ptr->remote_address, &client_ptr->address_info, buffer.length(), buffer.data_ptr());
TIMING_STEP(timings, "buffer write"); TIMING_STEP(timings, "buffer write");
if(res != buffer.length()){ if(res != buffer.length()){

View File

@ -19,7 +19,6 @@ namespace ts {
class VirtualServer; class VirtualServer;
class ConnectedClient; class ConnectedClient;
class VoiceClient; class VoiceClient;
class POWHandler;
struct VoiceServerBinding { struct VoiceServerBinding {
sockaddr_storage address{}; sockaddr_storage address{};
@ -62,7 +61,7 @@ namespace ts {
inline std::shared_ptr<VirtualServer> get_server() { return this->server; } inline std::shared_ptr<VirtualServer> get_server() { return this->server; }
private: private:
std::unique_ptr<POWHandler> pow_handler; std::unique_ptr<server::udp::POWHandler> pow_handler;
std::shared_ptr<VirtualServer> server = nullptr; std::shared_ptr<VirtualServer> server = nullptr;
bool running = false; bool running = false;

View File

@ -18,7 +18,7 @@ bool PuzzleManager::precompute_puzzles(size_t amount) {
std::mt19937 mt{rd()}; std::mt19937 mt{rd()};
while(this->precomputed_puzzle_count() < amount) while(this->precomputed_puzzle_count() < amount)
this->generate_puzzle(); this->generate_puzzle(mt);
return this->precomputed_puzzle_count() > 0; return this->precomputed_puzzle_count() > 0;
} }

View File

@ -6,9 +6,14 @@
#include <misc/spin_lock.h> #include <misc/spin_lock.h>
#include <Definitions.h> #include <Definitions.h>
#include <mutex> #include <mutex>
#include <deque>
namespace ts::server { namespace ts::server {
class VoiceClient; class VoiceClient;
namespace vserver {
class VirtualServerBase;
}
} }
namespace ts::server::server::udp { namespace ts::server::server::udp {
@ -75,23 +80,24 @@ namespace ts::server::server::udp {
}; };
struct io_binding { struct io_binding {
VirtualServerId server_id{0}; vserver::VirtualServerBase* virtual_server{nullptr};
sockaddr_storage address{}; sockaddr_storage address{};
size_t loop_entry_index{0}; size_t loop_entry_index{0};
std::vector<io_loop_entry*> loop_entries{}; std::vector<io_loop_entry*> loop_entries{};
struct server_client {
std::shared_ptr<VoiceClient> client{};
ClientId client_id{0};
};
std::mutex client_lock{};
std::deque<server_client> known_clients{};
}; };
class Server { class Server {
public: public:
Server();
~Server();
bool initialize(std::string& /* error */);
void finalize();
void register_virtual_server(vserver::VirtualServerBase* /* server */);
/* this will block until all executions have been finished */
void unregister_virtual_server(vserver::VirtualServerBase* /* server */);
void schedule_client_write(const std::shared_ptr<VoiceClient>& /* client */); void schedule_client_write(const std::shared_ptr<VoiceClient>& /* client */);
@ -102,6 +108,6 @@ namespace ts::server::server::udp {
std::vector<io_loop*> io_loops{}; std::vector<io_loop*> io_loops{};
std::mutex bindings_lock{}; std::mutex bindings_lock{};
std::vector<io_binding*> io_bindings{}; std::vector<io_binding*> io_bindings{}; /* may contains nullptr! */
}; };
} }

View File

@ -344,7 +344,7 @@ PermissionResetResult PermissionService::reset_server_permissions() {
if(!group) { if(!group) {
logError(this->get_server_id(), "Could not find server admin group from template name ({}). We're not generating an admin token.", group_name, group); logError(this->get_server_id(), "Could not find server admin group from template name ({}). We're not generating an admin token.", group_name, group);
} else { } else {
auto token = vs->token_manager().createToken(server::tokens::TOKEN_SERVER, group->group_id(), "Default server token for the server admin."); auto token = vs->token_manager().createToken(ts::server::tokens::TOKEN_SERVER, group->group_id(), "Default server token for the server admin.");
if(!token) { if(!token) {
logError(this->get_server_id(), "Failed to generate default server admin token."); logError(this->get_server_id(), "Failed to generate default server admin token.");
} else { } else {

2
shared

@ -1 +1 @@
Subproject commit 9533fe8920ea82313dcd49a4e003f39e50c7d81e Subproject commit 2ffa12489d4c7b16789ec2a93d82d02ee412b264