Teaspeak-Server/server/src/client/voice/VoiceClientConnection.h

180 lines
7.9 KiB
C
Raw Normal View History

#pragma once
#include <protocol/ringbuffer.h>
#include <protocol/CompressionHandler.h>
2020-01-27 02:21:39 +01:00
#include <protocol/CryptHandler.h>
#include <ThreadPool/Thread.h>
#include <ThreadPool/Mutex.h>
#include <protocol/buffers.h>
#include <chrono>
#include <deque>
#include <event.h>
#include <condition_variable>
2020-01-27 02:21:39 +01:00
#include <utility>
#include <pipes/buffer.h>
#include "VoiceClient.h"
#include "protocol/AcknowledgeManager.h"
2020-01-27 02:21:39 +01:00
#include <protocol/generation.h>
//#define LOG_ACK_SYSTEM
#ifdef LOG_ACK_SYSTEM
#define LOG_AUTO_ACK_REQUEST
#define LOG_AUTO_ACK_RESPONSE
#define LOG_PKT_RESEND
#endif
//#define PKT_LOG_PING
namespace ts {
namespace server {
class VoiceClient;
class VoiceServer;
2020-01-24 02:57:58 +01:00
class POWHandler;
}
namespace connection {
class VoiceClientConnection {
2020-01-24 02:57:58 +01:00
friend class AcknowledgeManager;
friend class server::VoiceServer;
friend class server::VoiceClient;
friend class server::POWHandler;
public:
2020-01-27 02:21:39 +01:00
struct CommandFragment {
uint16_t packet_id{0};
uint16_t packet_generation{0};
uint8_t packet_flags{0};
uint32_t payload_length : 24;
pipes::buffer payload{};
CommandFragment() { this->payload_length = 0; }
CommandFragment(uint16_t packetId, uint16_t packetGeneration, uint8_t packetFlags, uint32_t payloadLength, pipes::buffer payload) : packet_id{packetId}, packet_generation{packetGeneration}, packet_flags{packetFlags},
payload_length{payloadLength}, payload{std::move(payload)} {}
CommandFragment& operator=(const CommandFragment&) = default;
CommandFragment(const CommandFragment& other) = default;
CommandFragment(CommandFragment&&) = default;
};
static_assert(sizeof(CommandFragment) == 8 + sizeof(pipes::buffer));
typedef protocol::PacketRingBuffer<CommandFragment, 32, CommandFragment> command_fragment_buffer_t;
typedef std::array<command_fragment_buffer_t, 2> command_packet_reassembler;
2020-01-24 02:57:58 +01:00
explicit VoiceClientConnection(server::VoiceClient*);
virtual ~VoiceClientConnection();
void sendPacket(const std::shared_ptr<protocol::ServerPacket>& original_packet, bool copy = false, bool prepare_directly = false);
2020-01-27 02:21:39 +01:00
CryptHandler* getCryptHandler(){ return &crypt_handler; }
server::VoiceClient* getClient(){ return client; }
#ifdef VC_USE_READ_QUEUE
bool handleNextDatagram();
#endif
2020-01-24 02:57:58 +01:00
/*
* Split packets waiting in write_process_queue and moves the final buffers to writeQueue.
* @returns true when there are more packets to prepare
*/
2019-10-22 18:39:52 +02:00
bool preprocess_write_packets();
/* return 2 => Nothing | 1 => More and buffer is set | 0 => Buffer is set, nothing more */
int pop_write_buffer(pipes::buffer& /* buffer */);
bool wait_empty_write_and_prepare_queue(std::chrono::time_point<std::chrono::system_clock> until = std::chrono::time_point<std::chrono::system_clock>());
protocol::PacketIdManager& getPacketIdManager() { return this->packet_id_manager; }
2020-04-02 19:24:57 +02:00
inline auto& get_incoming_generation_estimators() { return this->incoming_generation_estimators; }
2020-01-24 02:57:58 +01:00
void reset();
2020-01-27 02:21:39 +01:00
void force_insert_command(const pipes::buffer_view& /* payload */);
void register_initiv_packet();
//buffer::SortedBufferQueue<protocol::ClientPacket>** getReadQueue() { return this->readTypedQueue; }
protected:
2020-01-27 02:21:39 +01:00
void handle_incoming_datagram(const pipes::buffer_view &buffer);
2020-01-24 02:57:58 +01:00
bool verify_encryption(const pipes::buffer_view& /* full packet */);
void triggerWrite();
private:
server::VoiceClient* client = nullptr;
//Decryption / encryption stuff
2020-01-27 02:21:39 +01:00
CryptHandler crypt_handler; /* access to CryptHandler is thread save */
CompressionHandler compress_handler;
AcknowledgeManager acknowledge_handler;
2020-03-28 23:08:11 +01:00
std::atomic_bool should_reassembled_reschedule; /* this get checked as soon the command handle lock has been released so trylock will succeed */
//Handle stuff
2020-01-27 02:21:39 +01:00
void execute_handle_command_packets(const std::chrono::system_clock::time_point& /* scheduled */);
2020-01-27 13:02:22 +01:00
bool next_reassembled_command(std::unique_lock<std::recursive_timed_mutex> &buffer_execute_lock /* packet channel execute lock */, pipes::buffer & /* buffer*/, uint16_t& /* packet id */);
2020-01-24 02:57:58 +01:00
/* ---------- Write declarations ---------- */
spin_lock write_queue_lock; /* queue access isn't for long in general */
2020-01-24 02:57:58 +01:00
std::deque<pipes::buffer> write_queue;
struct WritePreprocessCategory {
enum value {
PING_PONG = 0, //Ping/Pongs
ACK = 2,
VOICE_WHISPER = 1, //Voice/Whisper
COMMAND = 3,
INIT = 4,
MAX = INIT
};
inline static value from_type(protocol::PacketType type) {
switch(type) {
case protocol::PING:
case protocol::PONG:
return value::PING_PONG;
case protocol::VOICE:
case protocol::VOICE_WHISPER:
return value::VOICE_WHISPER;
case protocol::ACK:
case protocol::ACK_LOW:
return value::ACK;
case protocol::COMMAND:
case protocol::COMMAND_LOW:
return value::COMMAND;
default:
return value::INIT;
}
}
};
struct WritePreprocessQueue {
int _zero1{0};
bool has_work{false};
std::mutex work_lock{};
spin_lock queue_lock{};
std::deque<std::shared_ptr<protocol::ServerPacket>> queue{};
int _zero{0};
};
2019-10-22 18:39:52 +02:00
std::array<WritePreprocessQueue, WritePreprocessCategory::MAX> write_preprocess_queues{};
/* ---------- Processing ---------- */
/* automatically locked because packets of the same kind should be lock their "work_lock" from their WritePreprocessQueue object */
2020-01-24 02:57:58 +01:00
protocol::PacketIdManager packet_id_manager;
/* this function is thread save :) */
std::atomic<uint8_t> prepare_process_count{0}; /* current thread count preparing a packet */
2019-10-22 18:39:52 +02:00
bool prepare_packet_for_write(std::vector<pipes::buffer> &/* buffers which need to be transferred */, const std::shared_ptr<protocol::ServerPacket> &/* the packet */, std::unique_lock<std::mutex>& /* work lock */);
2020-01-27 02:21:39 +01:00
std::array<protocol::generation_estimator, 9> incoming_generation_estimators{}; /* implementation is thread save */
2020-01-24 02:57:58 +01:00
std::recursive_mutex packet_buffer_lock;
2020-01-27 02:21:39 +01:00
command_packet_reassembler _command_fragment_buffers;
static inline uint8_t command_fragment_buffer_index(uint8_t packet_index) {
return packet_index & 0x1U; /* use 0 for command and 1 for command low */
}
};
}
}