Teaspeak-Server/server/src/client/web/WSWebClient.cpp

157 lines
5.4 KiB
C++
Raw Normal View History

#include "WebClient.h"
#include <log/LogUtils.h>
#include <src/server/VoiceServer.h>
#include <src/InstanceHandler.h>
using namespace std;
using namespace std::chrono;
using namespace ts;
using namespace ts::server;
using namespace ts::protocol;
void WebClient::handleMessageWrite(int fd, short, void *) {
auto self_lock = _this.lock();
unique_lock buffer_lock(this->queue_lock);
if(this->queue_write.empty()) return;
auto buffer = this->queue_write[0];
this->queue_write.pop_front();
auto written = send(fd, buffer.data_ptr(), buffer.length(), MSG_NOSIGNAL);
if(written == -1) {
buffer_lock.unlock();
if (errno == EINTR || errno == EAGAIN) {
lock_guard event_lock(this->event_lock);
if(this->writeEvent)
event_add(this->writeEvent, nullptr);
return;
} else {
//new ServerConnection(globalClient).startConnection({ host: "localhost", port: 9987}, new HandshakeHandler(profiles.default_profile(), "test"))
{
lock_guard event_lock(this->event_lock);
event_del_noblock(this->writeEvent);
event_free(this->writeEvent);
this->writeEvent = nullptr;
}
debugMessage(this->getServerId(), "[{}] Failed to write message (length {}, errno {}, message {}) Disconnecting client.", CLIENT_STR_LOG_PREFIX, written, errno, strerror(errno));
}
this->closeConnection(system_clock::now()); /* close connection in a new thread */
return;
}
if(written < buffer.length()) {
buffer = buffer.range((size_t) written); /* get the overhead */
this->queue_write.push_front(buffer);
}
if(this->queue_write.empty())
return;
/* reschedule new write */
buffer_lock.unlock();
lock_guard event_lock(this->event_lock);
if(this->writeEvent)
event_add(this->writeEvent, nullptr);
}
void WebClient::handleMessageRead(int fd, short, void *) {
auto self_lock = _this.lock();
size_t buffer_length = 1024 * 4;
uint8_t buffer[buffer_length];
auto length = read(fd, buffer, buffer_length);
if(length <= 0){
if(errno == EINTR || errno == EAGAIN)
;
else {
debugMessage(this->getServerId(), "[{}] Failed to read message (length {}, errno {}, message: {}). Closing connection.", CLIENT_STR_LOG_PREFIX, length, errno, strerror(errno));
{
lock_guard lock(this->event_lock);
if(this->readEvent)
event_del_noblock(this->readEvent);
}
self_lock->closeConnection(system_clock::now()); /* direct close, but from another thread */
}
return;
}
auto pbuffer = buffer::allocate_buffer((size_t) length);
pbuffer.write(buffer, length);
{
lock_guard lock(this->queue_lock);
this->queue_read.push_back(std::move(pbuffer));
}
this->registerMessageProcess();
}
void WebClient::enqueue_raw_packet(const pipes::buffer_view &msg) {
auto buffer = msg.owns_buffer() ? msg.own_buffer() : msg.own_buffer(); /* TODO: Use buffer::allocate_buffer(...) */
{
lock_guard queue_lock(this->queue_lock);
this->queue_write.push_back(buffer);
}
{
lock_guard lock(this->event_lock);
if(this->writeEvent)
event_add(this->writeEvent, nullptr);
}
2020-01-24 02:57:58 +01:00
this->connectionStatistics->logOutgoingPacket(stats::ConnectionStatistics::category::COMMAND, buffer.length());
}
void WebClient::registerMessageProcess() {
auto weakLock = this->_this;
if(serverInstance->getVoiceServerManager()->getState() == ServerManager::STARTED)
serverInstance->getVoiceServerManager()->get_executor_loop()->schedule(this->event_handle_packet);
}
inline bool is_ssl_handshake_header(const pipes::buffer_view& buffer) {
2020-01-24 02:57:58 +01:00
if(buffer.length() < 0x05) return false; //Header too small!
2020-01-24 02:57:58 +01:00
if(buffer[0] != 0x16) return false; //recordType=handshake
2020-01-24 02:57:58 +01:00
if(buffer[1] < 1 || buffer[1] > 3) return false; //SSL version
if(buffer[2] < 1 || buffer[2] > 3) return false; //TLS version
2020-01-24 02:57:58 +01:00
return true;
}
void WebClient::processNextMessage(const std::chrono::system_clock::time_point& /* scheduled */) {
lock_guard execute_lock(this->execute_lock);
if(this->state != ConnectionState::INIT_HIGH && this->state != ConnectionState::INIT_LOW && this->state != ConnectionState::CONNECTED)
return;
unique_lock buffer_lock(this->queue_lock);
if(this->queue_read.empty())
return;
auto buffer = this->queue_read.front();
this->queue_read.pop_front();
bool has_next = !this->queue_read.empty();
buffer_lock.unlock();
2019-09-14 12:06:48 +02:00
this->connectionStatistics->logIncomingPacket(stats::ConnectionStatistics::category::COMMAND, buffer.length());
2020-01-24 02:57:58 +01:00
if(!this->ssl_detected) {
this->ssl_detected = true;
this->ssl_encrypted = is_ssl_handshake_header(buffer);
if(this->ssl_encrypted)
2020-01-24 02:57:58 +01:00
logMessage(this->getServerId(), "[{}] Using encrypted basic connection.", CLIENT_STR_LOG_PREFIX_(this));
else
logMessage(this->getServerId(), "[{}] Using unencrypted basic connection.", CLIENT_STR_LOG_PREFIX_(this));
2020-01-24 02:57:58 +01:00
}
if(this->ssl_encrypted) {
2020-01-24 02:57:58 +01:00
this->ssl_handler.process_incoming_data(buffer);
} else {
2020-01-24 02:57:58 +01:00
this->ws_handler.process_incoming_data(buffer);
}
if(has_next)
this->registerMessageProcess();
}