Merge branch 'master' into 1.4.0
This commit is contained in:
commit
0b2ec5ac0a
@ -42,9 +42,12 @@ VoiceClientConnection::~VoiceClientConnection() {
|
||||
lock_guard write_queue_lock(this->write_queue_lock);
|
||||
this->write_queue.clear();
|
||||
}
|
||||
{
|
||||
lock_guard write_queue_lock(this->write_prepare_queue_lock);
|
||||
this->write_prepare_queue.clear();
|
||||
|
||||
for(auto& category : this->write_preprocess_queues) {
|
||||
lock_guard work_lock{category.work_lock};
|
||||
lock_guard queue_lock{category.queue_lock};
|
||||
|
||||
category.queue.clear();
|
||||
}
|
||||
this->client = nullptr;
|
||||
memtrack::freed<VoiceClientConnection>(this);
|
||||
@ -449,7 +452,8 @@ unique_ptr<protocol::ClientPacket> VoiceClientConnection::next_reassembled_packe
|
||||
|
||||
|
||||
void VoiceClientConnection::sendPacket(const shared_ptr<protocol::ServerPacket>& original_packet, bool copy, bool prepare_directly) {
|
||||
if(this->client->state == ConnectionState::DISCONNECTED) return;
|
||||
if(this->client->state == ConnectionState::DISCONNECTED)
|
||||
return;
|
||||
|
||||
shared_ptr<protocol::ServerPacket> packet;
|
||||
if(copy) {
|
||||
@ -461,14 +465,20 @@ void VoiceClientConnection::sendPacket(const shared_ptr<protocol::ServerPacket>&
|
||||
packet = original_packet;
|
||||
}
|
||||
|
||||
auto type = WritePreprocessCategory::from_type(packet->type().type());
|
||||
auto& queue = this->write_preprocess_queues[type];
|
||||
if(prepare_directly) {
|
||||
vector<pipes::buffer> buffers;
|
||||
this->prepare_process_count++;
|
||||
if(!this->prepare_packet_for_write(buffers, packet)) {
|
||||
|
||||
{
|
||||
unique_lock work_lock{queue.work_lock};
|
||||
if(!this->prepare_packet_for_write(buffers, packet, work_lock)) {
|
||||
logError(this->client->getServerId(), "{} Dropping packet!", CLIENT_STR_LOG_PREFIX_(this->client));
|
||||
this->prepare_process_count--;
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
/* enqueue buffers for write */
|
||||
{
|
||||
@ -477,13 +487,16 @@ void VoiceClientConnection::sendPacket(const shared_ptr<protocol::ServerPacket>&
|
||||
}
|
||||
this->prepare_process_count--; /* we're now done preparing */
|
||||
} else {
|
||||
lock_guard prepare_queue_lock(this->write_prepare_queue_lock);
|
||||
this->write_prepare_queue.push_back(packet);
|
||||
lock_guard queue_lock{queue.queue_lock};
|
||||
queue.queue.push_back(packet);
|
||||
queue.has_work = true;
|
||||
}
|
||||
this->triggerWrite();
|
||||
}
|
||||
|
||||
bool VoiceClientConnection::prepare_packet_for_write(vector<pipes::buffer> &result, const shared_ptr<ServerPacket> &packet) {
|
||||
bool VoiceClientConnection::prepare_packet_for_write(vector<pipes::buffer> &result, const shared_ptr<ServerPacket> &packet, std::unique_lock<std::mutex>& work_lock) {
|
||||
assert(work_lock.owns_lock());
|
||||
|
||||
string error = "success";
|
||||
|
||||
if(packet->type().compressable() && !packet->memory_state.fragment_entry) {
|
||||
@ -497,7 +510,7 @@ bool VoiceClientConnection::prepare_packet_for_write(vector<pipes::buffer> &resu
|
||||
std::vector<shared_ptr<ServerPacket>> fragments;
|
||||
fragments.reserve((size_t) (packet->data().length() / packet->type().max_length()) + 1);
|
||||
|
||||
if(packet->data().length() > packet->type().max_length()){
|
||||
if(packet->data().length() > packet->type().max_length()) {
|
||||
if(!packet->type().fragmentable()) {
|
||||
logError(this->client->getServerId(), "{} We've tried to send a too long, not fragmentable, packet. Dropping packet of type {} with length {}", CLIENT_STR_LOG_PREFIX_(this->client), packet->type().name(), packet->data().length());
|
||||
return false;
|
||||
@ -533,21 +546,21 @@ bool VoiceClientConnection::prepare_packet_for_write(vector<pipes::buffer> &resu
|
||||
|
||||
if(packet->getListener())
|
||||
fragments.back()->setListener(std::move(packet->getListener())); //Move the listener to the last :)
|
||||
} else
|
||||
} else {
|
||||
fragments.push_back(packet);
|
||||
}
|
||||
|
||||
result.reserve(fragments.size());
|
||||
|
||||
/* apply packet ids */
|
||||
{
|
||||
lock_guard id_lock(this->packet_id_manager_lock);
|
||||
for(const auto& fragment : fragments) {
|
||||
if(!fragment->memory_state.id_branded)
|
||||
fragment->applyPacketId(this->packet_id_manager);
|
||||
}
|
||||
}
|
||||
work_lock.unlock(); /* the rest could be unordered */
|
||||
|
||||
|
||||
auto statistics = this->client && this->client->getServer() ? this->client->connectionStatistics : nullptr;
|
||||
auto statistics = this->client ? this->client->connectionStatistics : nullptr;
|
||||
for(const auto& fragment : fragments) {
|
||||
if(!this->crypt_handler.progressPacketOut(fragment.get(), error, false)){
|
||||
logError(this->client->getServerId(), "{} Failed to encrypt packet. Error: {}", CLIENT_STR_LOG_PREFIX_(this->client), error);
|
||||
@ -564,50 +577,50 @@ bool VoiceClientConnection::prepare_packet_for_write(vector<pipes::buffer> &resu
|
||||
return true;
|
||||
}
|
||||
|
||||
#ifdef VC_USE_READ_QUEUE
|
||||
bool VoiceClientConnection::handleNextDatagram() {
|
||||
|
||||
bool flag_empty;
|
||||
pipes::buffer buffer;
|
||||
{
|
||||
lock_guard<recursive_mutex> queue_lock(this->queueLock);
|
||||
if(this->readQueue.empty()) return false;
|
||||
buffer = std::move(this->readQueue.front());
|
||||
this->readQueue.pop_front();
|
||||
flag_empty = this->readQueue.empty();
|
||||
};
|
||||
|
||||
try {
|
||||
this->handleDatagramReceived(buffer);
|
||||
} catch (std::exception& e) {
|
||||
logCritical(this->client->getServerId(), "Handling of raw packet thrown an uncaught exception! (Message: " + string(e.what()) + ")");
|
||||
}
|
||||
return flag_empty;
|
||||
}
|
||||
#endif
|
||||
|
||||
bool VoiceClientConnection::prepare_write_packets() {
|
||||
/* get the next packet to prepare */
|
||||
unique_lock prepare_queue_lock(this->write_prepare_queue_lock);
|
||||
if(this->write_prepare_queue.empty())
|
||||
return false;
|
||||
bool VoiceClientConnection::preprocess_write_packets() {
|
||||
std::shared_ptr<ServerPacket> packet{nullptr};
|
||||
vector<pipes::buffer> buffers{};
|
||||
bool flag_more{false};
|
||||
|
||||
prepare_process_count++; /* we're not preparing a packet */
|
||||
auto packet = move(this->write_prepare_queue.front());
|
||||
this->write_prepare_queue.pop_front();
|
||||
auto flag_more = !this->write_prepare_queue.empty();
|
||||
prepare_queue_lock.unlock();
|
||||
for(auto& category : this->write_preprocess_queues) {
|
||||
if(!category.has_work) continue;
|
||||
else if(packet) {
|
||||
flag_more = true;
|
||||
break;
|
||||
}
|
||||
|
||||
/* prepare the next packet */
|
||||
vector<pipes::buffer> buffers;
|
||||
if(!this->prepare_packet_for_write(buffers, packet)) {
|
||||
unique_lock work_lock{category.work_lock, try_to_lock};
|
||||
if(!work_lock) continue; /* This particular category will already be processed */
|
||||
|
||||
{
|
||||
lock_guard buffer_lock{category.queue_lock};
|
||||
if(category.queue.empty()) {
|
||||
category.has_work = false;
|
||||
continue;
|
||||
}
|
||||
|
||||
packet = std::move(category.queue.front());
|
||||
category.queue.pop_front();
|
||||
category.has_work = !category.queue.empty();
|
||||
}
|
||||
|
||||
if(!this->prepare_packet_for_write(buffers, packet, work_lock)) {
|
||||
logError(this->client->getServerId(), "{} Dropping packet!", CLIENT_STR_LOG_PREFIX_(this->client));
|
||||
this->prepare_process_count--;
|
||||
return flag_more;
|
||||
if(flag_more)
|
||||
break;
|
||||
else
|
||||
continue; /* find out if we have more */
|
||||
}
|
||||
|
||||
if(flag_more)
|
||||
break;
|
||||
else
|
||||
continue; /* find out if we have more */
|
||||
}
|
||||
|
||||
/* enqueue buffers for write */
|
||||
{
|
||||
if(!buffers.empty()) {
|
||||
lock_guard write_queue_lock(this->write_queue_lock);
|
||||
this->write_queue.insert(this->write_queue.end(), buffers.begin(), buffers.end());
|
||||
}
|
||||
@ -645,13 +658,19 @@ int VoiceClientConnection::pop_write_buffer(pipes::buffer& target) {
|
||||
|
||||
bool VoiceClientConnection::wait_empty_write_and_prepare_queue(chrono::time_point<chrono::system_clock> until) {
|
||||
while(true) {
|
||||
for(auto& queue : this->write_preprocess_queues) {
|
||||
{
|
||||
lock_guard prepare_lock{this->write_prepare_queue_lock};
|
||||
if(!this->write_prepare_queue.empty())
|
||||
lock_guard lock{queue.queue_lock};
|
||||
if(!queue.queue.empty())
|
||||
goto _wait;
|
||||
if(this->prepare_process_count != 0)
|
||||
}
|
||||
|
||||
{
|
||||
unique_lock lock{queue.work_lock, try_to_lock};
|
||||
if(!lock.owns_lock())
|
||||
goto _wait;
|
||||
}
|
||||
}
|
||||
{
|
||||
lock_guard buffer_lock{this->write_queue_lock};
|
||||
if(!this->write_queue.empty())
|
||||
@ -671,12 +690,16 @@ bool VoiceClientConnection::wait_empty_write_and_prepare_queue(chrono::time_poin
|
||||
}
|
||||
|
||||
void VoiceClientConnection::reset() {
|
||||
for(auto& queue : this->write_preprocess_queues) {
|
||||
{
|
||||
lock_guard lock{queue.queue_lock};
|
||||
queue.queue.clear();
|
||||
}
|
||||
}
|
||||
|
||||
this->acknowledge_handler.reset();
|
||||
this->crypt_handler.reset();
|
||||
{
|
||||
lock_guard manager_lock(this->packet_id_manager_lock);
|
||||
this->packet_id_manager.reset();
|
||||
}
|
||||
|
||||
{
|
||||
lock_guard buffer_lock(this->packet_buffer_lock);
|
||||
|
@ -55,7 +55,7 @@ namespace ts {
|
||||
* Split packets waiting in write_process_queue and moves the final buffers to writeQueue.
|
||||
* @returns true when there are more packets to prepare
|
||||
*/
|
||||
bool prepare_write_packets();
|
||||
bool preprocess_write_packets();
|
||||
/* return 2 => Nothing | 1 => More and buffer is set | 0 => Buffer is set, nothing more */
|
||||
int pop_write_buffer(pipes::buffer& /* buffer */);
|
||||
|
||||
@ -84,22 +84,64 @@ namespace ts {
|
||||
std::unique_ptr<protocol::ClientPacket> next_reassembled_packet(std::unique_lock<std::recursive_timed_mutex>& /* packet channel execute lock */, bool& /* have more */);
|
||||
|
||||
|
||||
#ifdef VC_USE_READ_QUEUE
|
||||
std::deque<pipes::buffer> readQueue;
|
||||
#endif
|
||||
|
||||
/* ---------- Write declarations ---------- */
|
||||
spin_lock write_queue_lock; /* queue access isn't for long in general */
|
||||
std::deque<pipes::buffer> write_queue;
|
||||
|
||||
spin_lock write_prepare_queue_lock; /* preprocess queue access isn't for long in general */
|
||||
std::deque<std::shared_ptr<protocol::ServerPacket>> write_prepare_queue;
|
||||
struct WritePreprocessCategory {
|
||||
enum value {
|
||||
PING_PONG = 0, //Ping/Pongs
|
||||
ACK = 2,
|
||||
VOICE_WHISPER = 1, //Voice/Whisper
|
||||
COMMAND = 3,
|
||||
INIT = 4,
|
||||
|
||||
spin_lock packet_id_manager_lock; /* packet id's must be generated in order; Calculating the ID should also not be take too much time */
|
||||
MAX = INIT
|
||||
};
|
||||
|
||||
inline static value from_type(protocol::PacketType type) {
|
||||
switch(type) {
|
||||
case protocol::PING:
|
||||
case protocol::PONG:
|
||||
return value::PING_PONG;
|
||||
|
||||
case protocol::VOICE:
|
||||
case protocol::VOICE_WHISPER:
|
||||
return value::VOICE_WHISPER;
|
||||
|
||||
case protocol::ACK:
|
||||
case protocol::ACK_LOW:
|
||||
return value::ACK;
|
||||
|
||||
case protocol::COMMAND:
|
||||
case protocol::COMMAND_LOW:
|
||||
return value::COMMAND;
|
||||
|
||||
default:
|
||||
return value::INIT;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
struct WritePreprocessQueue {
|
||||
int _zero1{0};
|
||||
bool has_work{false};
|
||||
std::mutex work_lock{};
|
||||
|
||||
spin_lock queue_lock{};
|
||||
std::deque<std::shared_ptr<protocol::ServerPacket>> queue{};
|
||||
|
||||
int _zero{0};
|
||||
};
|
||||
std::array<WritePreprocessQueue, WritePreprocessCategory::MAX> write_preprocess_queues{};
|
||||
|
||||
/* ---------- Processing ---------- */
|
||||
/* automatically locked because packets of the same kind should be lock their "work_lock" from their WritePreprocessQueue object */
|
||||
protocol::PacketIdManager packet_id_manager;
|
||||
|
||||
/* this function is thread save :) */
|
||||
std::atomic<uint8_t> prepare_process_count{0}; /* current thread count preparing a packet */
|
||||
bool prepare_packet_for_write(std::vector<pipes::buffer> &/* buffers which need to be transferred */, const std::shared_ptr<protocol::ServerPacket> &/* the packet */);
|
||||
bool prepare_packet_for_write(std::vector<pipes::buffer> &/* buffers which need to be transferred */, const std::shared_ptr<protocol::ServerPacket> &/* the packet */, std::unique_lock<std::mutex>& /* work lock */);
|
||||
|
||||
std::recursive_mutex packet_buffer_lock;
|
||||
packet_buffers_t _packet_buffers;
|
||||
|
@ -489,7 +489,7 @@ void VoiceServer::handleMessageWrite(int fd, short events, void *_event_handle)
|
||||
auto client_ptr = &*client;
|
||||
|
||||
TIMING_STEP(timings, "client get");
|
||||
more_to_prepare = connection->prepare_write_packets();
|
||||
more_to_prepare = connection->preprocess_write_packets();
|
||||
TIMING_STEP(timings, "client prepare");
|
||||
|
||||
while(system_clock::now() <= write_timeout) {
|
||||
|
Loading…
Reference in New Issue
Block a user