Some license server improvements

This commit is contained in:
WolverinDEV
2020-02-06 13:49:39 +01:00
parent 14d9936d90
commit 3e39bcc38c
9 changed files with 322 additions and 250 deletions
+116 -95
View File
@@ -2,6 +2,7 @@
#include <sys/socket.h>
#include <netinet/in.h>
#include <algorithm>
#include <utility>
#include <arpa/inet.h>
#include <log/LogUtils.h>
#include <misc/endianness.h>
@@ -9,6 +10,7 @@
#include <shared/License.h>
#include <shared/crypt.h>
#include <misc/base64.h>
#include <ThreadPool/ThreadHelper.h>
#include "LicenseServer.h"
#include "crypt.h"
#include "UserManager.h"
@@ -19,98 +21,100 @@ using namespace license;
using namespace ts;
LicenseServer::LicenseServer(const sockaddr_in& addr,
const std::shared_ptr<server::LicenseManager>& manager,
const shared_ptr<license::stats::StatisticManager> &stats,
const shared_ptr<license::web::WebStatistics> &wstats,
const std::shared_ptr<UserManager>& user_manager) : manager(manager), statistics(stats), web_statistics(wstats), user_manager(user_manager) {
this->localAddr = new sockaddr_in{};
memcpy(this->localAddr, &addr, sizeof(addr));
std::shared_ptr<server::LicenseManager> manager,
shared_ptr<license::stats::StatisticManager> stats,
shared_ptr<license::web::WebStatistics> wstats,
std::shared_ptr<UserManager> user_manager) : manager{std::move(manager)}, statistics{std::move(stats)}, web_statistics{std::move(wstats)}, user_manager{std::move(user_manager)} {
memcpy(&this->localAddr, &addr, sizeof(addr));
}
LicenseServer::~LicenseServer() {
this->stopServer();
delete this->localAddr;
this->stop();
}
#define SFAIL(message) \
do { \
logError(lstream << (message) << " Message: " << errno << "/" << strerror(errno)); \
this->stopServer(); \
logError(LOG_GENERAL, " Message: {} ({}/{})", message, errno, strerror(errno)); \
this->stop(); \
return false; \
} while(0)
static int enabled = 1;
static int disabled = 0;
bool LicenseServer::startServer() {
bool LicenseServer::start() {
{
lock_guard lock(this->lock);
lock_guard lock(this->client_lock);
if(this->running) return false;
this->running = true;
}
fileDescriptor = socket(AF_INET, SOCK_STREAM, 0);
if (fileDescriptor < 0) SFAIL("Could not create new socket");
server_socket = socket(AF_INET, SOCK_STREAM, 0);
if (server_socket < 0) SFAIL("Could not create new socket");
if(setsockopt(this->fileDescriptor, SOL_SOCKET, SO_REUSEADDR, &enabled, sizeof(enabled)) < 0) SFAIL("could not set reuse address");
if(setsockopt(this->fileDescriptor, IPPROTO_TCP, TCP_CORK, &disabled, sizeof(disabled)) < 0) SFAIL("could not set no push");
if(bind(this->fileDescriptor, (struct sockaddr *) this->localAddr, sizeof(sockaddr_in)) < 0) SFAIL("Could not bind socket on " + string(inet_ntoa(this->localAddr->sin_addr)));
if(setsockopt(this->server_socket, SOL_SOCKET, SO_REUSEADDR, &enabled, sizeof(enabled)) < 0) SFAIL("could not set reuse address");
if(setsockopt(this->server_socket, IPPROTO_TCP, TCP_CORK, &disabled, sizeof(disabled)) < 0) SFAIL("could not set no push");
if(bind(this->server_socket, (struct sockaddr *) &this->localAddr, sizeof(sockaddr_in)) < 0) SFAIL("Could not bind socket on " + string(inet_ntoa(this->localAddr.sin_addr)));
if(listen(this->fileDescriptor, 32) < 0) SFAIL("Could not listen on socket");
if(listen(this->server_socket, 32) < 0) SFAIL("Could not listen on socket");
this->evBase = event_base_new();
this->acceptEvent = event_new(this->evBase, this->fileDescriptor, EV_READ | EV_PERSIST, LicenseServer::handleEventAccept, this);
this->client_cleanup = evtimer_new(this->evBase, LicenseServer::handleEventCleanup, this);
this->event_accept = event_new(this->evBase, this->server_socket, EV_READ | EV_PERSIST, LicenseServer::handleEventAccept, this);
this->event_cleanup = evtimer_new(this->evBase, LicenseServer::handleEventCleanup, this);
event_add(this->acceptEvent, nullptr);
event_add(this->event_accept, nullptr);
{
timeval now{1, 0};
evtimer_add(this->client_cleanup, &now);
evtimer_add(this->event_cleanup, &now);
}
evBaseDispatch = new threads::Thread(THREAD_SAVE_OPERATIONS, [&](){
event_base_dispatch = std::thread([&]{
signal(SIGPIPE, SIG_IGN);
event_base_dispatch(this->evBase);
::event_base_dispatch(this->evBase);
});
return true;
}
void LicenseServer::stopServer() {
void LicenseServer::stop() {
{
lock_guard lock(this->lock);
lock_guard lock(this->client_lock);
if(!this->running) return;
this->running = false;
}
/* first unregister the accept event so we don't get new clients */
if(this->event_accept) {
event_del(this->event_accept);
event_free(this->event_accept);
}
this->event_accept = nullptr;
/* disconnect all clients */
for(const auto& client : this->getClients())
this->closeConnection(client);
if(this->acceptEvent) {
event_del(this->acceptEvent);
event_free(this->acceptEvent);
}
this->acceptEvent = nullptr;
if(this->client_cleanup) {
event_del_block(this->client_cleanup);
event_free(this->client_cleanup);
this->client_cleanup = nullptr;
}
if(this->evBase)
event_base_loopbreak(this->evBase);
if(this->evBaseDispatch)
this->evBaseDispatch->join();
delete this->evBaseDispatch;
this->evBaseDispatch = nullptr;
if(this->evBase) {
event_base_loopbreak(this->evBase); /* again for some reason */
if(!threads::timed_join(this->event_base_dispatch, std::chrono::seconds{2})) {
this->event_base_dispatch.detach();
logCritical(LOG_GENERAL, "Failed to join event base dispatch thread. This will cause memory leaks.");
}
/* Needs to be cleaned up after event loop has been destroyed. Because its used within the event loop. */
if(this->event_cleanup) {
event_del_block(this->event_cleanup);
event_free(this->event_cleanup);
this->event_cleanup = nullptr;
}
if(this->evBase)
event_base_free(this->evBase);
}
this->evBase = nullptr;
if(this->fileDescriptor != 0) {
shutdown(this->fileDescriptor, SHUT_RDWR);
close(this->fileDescriptor);
this->fileDescriptor = 0;
if(this->server_socket != 0) {
shutdown(this->server_socket, SHUT_RDWR);
close(this->server_socket);
this->server_socket = 0;
}
}
@@ -120,8 +124,8 @@ void LicenseServer::handleEventCleanup(int, short, void* ptrServer) {
server->cleanup_clients();
timeval next{1, 0};
if(server->client_cleanup)
event_add(server->client_cleanup, &next);
if(server->event_cleanup)
event_add(server->event_cleanup, &next);
}
//Basic IO
@@ -129,26 +133,27 @@ void LicenseServer::handleEventWrite(int fd, short, void* ptrServer) {
auto server = static_cast<LicenseServer *>(ptrServer);
auto client = server->findClient(fd);
if(!client) return;
buffer::RawBuffer* buffer = nullptr;
{
threads::MutexLock lock(client->network.lock);
buffer = TAILQ_FIRST(&client->network.writeQueue);
if(!buffer) return;
auto writtenBytes = send(fd, &buffer->buffer[buffer->index], buffer->length - buffer->index, 0);
buffer::RawBuffer* write_buffer{nullptr};
while(true) { //TODO: May add some kind of timeout?
std::lock_guard lock(client->network.write_queue_lock);
write_buffer = TAILQ_FIRST(&client->network.write_queue);
if(!write_buffer) return;
auto writtenBytes = send(fd, &write_buffer->buffer[write_buffer->index], write_buffer->length - write_buffer->index, 0);
if(writtenBytes <= 0) {
if(writtenBytes == -1 && errno == EAGAIN)
return;
logError(LOG_LICENSE_CONTROLL, "Invalid write. Disconnecting remote client. Message: {}/{}", errno, strerror(errno));
} else {
buffer->index += writtenBytes;
write_buffer->index += writtenBytes;
}
if(buffer->index >= buffer->length) {
TAILQ_REMOVE(&client->network.writeQueue, buffer, tail);
delete buffer;
if(write_buffer->index >= write_buffer->length) {
TAILQ_REMOVE(&client->network.write_queue, write_buffer, tail);
delete write_buffer;
}
if(!TAILQ_EMPTY(&client->network.writeQueue))
if(!TAILQ_EMPTY(&client->network.write_queue))
event_add(client->network.writeEvent, nullptr);
}
}
@@ -164,23 +169,31 @@ void ConnectedClient::sendPacket(const protocol::packet& packet) {
xorBuffer(&buffer->buffer[sizeof(packet.header)], packet.data.length(), this->protocol.cryptKey.data(), this->protocol.cryptKey.length());
{
threads::MutexLock lock(this->network.lock);
TAILQ_INSERT_TAIL(&this->network.writeQueue, buffer, tail);
lock_guard queue_lock{this->network.write_queue_lock};
TAILQ_INSERT_TAIL(&this->network.write_queue, buffer, tail);
}
event_add(this->network.writeEvent, nullptr);
{
lock_guard state_lock{this->protocol.state_lock};
if(this->protocol.state == protocol::UNCONNECTED) goto error_cleanup;
event_add(this->network.writeEvent, nullptr);
return;
}
error_cleanup:
delete buffer;
}
void ConnectedClient::init() {
protocol.last_read = std::chrono::system_clock::now();
TAILQ_INIT(&network.writeQueue);
TAILQ_INIT(&network.write_queue);
}
void ConnectedClient::uninit() {
{
threads::MutexLock lock(this->network.lock);
lock_guard queue_lock{this->network.write_queue_lock};
ts::buffer::RawBuffer* buffer;
while ((buffer = TAILQ_FIRST(&this->network.writeQueue))) {
TAILQ_REMOVE(&this->network.writeQueue, buffer, tail);
while ((buffer = TAILQ_FIRST(&this->network.write_queue))) {
TAILQ_REMOVE(&this->network.write_queue, buffer, tail);
delete buffer;
}
}
@@ -236,21 +249,21 @@ void LicenseServer::handleEventAccept(int fd, short, void* ptrServer) {
if(setsockopt(client->network.fileDescriptor, IPPROTO_TCP, TCP_CORK, &disabled, sizeof(disabled)) < 0);// CERR("could not set no push");
if (client->network.fileDescriptor < 0) {
logCritical("Could not accept new client! (" + to_string(client->network.fileDescriptor) + "|" + to_string(errno) + "|" + strerror(errno) + ")");
logCritical(LOG_GENERAL, "Could not accept new client! (" + to_string(client->network.fileDescriptor) + "|" + to_string(errno) + "|" + strerror(errno) + ")");
return;
}
client->protocol.state = protocol::HANDSCAKE;
{
lock_guard lock(server->lock);
server->currentClients.push_back(client);
lock_guard lock(server->client_lock);
server->clients.push_back(client);
}
client->network.readEvent = event_new(server->evBase, client->network.fileDescriptor, EV_READ | EV_PERSIST, LicenseServer::handleEventRead, server);
client->network.writeEvent = event_new(server->evBase, client->network.fileDescriptor, EV_WRITE, LicenseServer::handleEventWrite, server);
event_add(client->network.readEvent, nullptr);
logMessage(lstream << "Got new client from " << inet_ntoa(client->network.remoteAddr.sin_addr));
logMessage(LOG_GENERAL, "Accepted new client from {}", inet_ntoa(client->network.remoteAddr.sin_addr));
}
void LicenseServer::disconnectClient(const std::shared_ptr<ConnectedClient>& client, const std::string &reason) {
@@ -258,15 +271,15 @@ void LicenseServer::disconnectClient(const std::shared_ptr<ConnectedClient>& cli
}
void LicenseServer::closeConnection(const std::shared_ptr<ConnectedClient> &client, bool blocking) {
if(this->evBaseDispatch && threads::self::id() == *this->evBaseDispatch) {
if(this_thread::get_id() == this->event_base_dispatch.get_id()) {
std::thread(std::bind(&LicenseServer::closeConnection, this, client, true)).detach();
return;
}
{
unique_lock lock(client->network.lock);
if(!TAILQ_EMPTY(&client->network.writeQueue)) {
unique_lock lock(client->network.write_queue_lock);
if(!TAILQ_EMPTY(&client->network.write_queue)) {
lock.unlock();
if(!blocking) {
@@ -277,7 +290,7 @@ void LicenseServer::closeConnection(const std::shared_ptr<ConnectedClient> &clie
while(system_clock::now() - start < seconds(5)){
{
lock.lock();
if(TAILQ_EMPTY(&client->network.writeQueue)) break;
if(TAILQ_EMPTY(&client->network.write_queue)) break;
lock.unlock();
}
threads::self::sleep_for(milliseconds(5));
@@ -289,49 +302,57 @@ void LicenseServer::closeConnection(const std::shared_ptr<ConnectedClient> &clie
void LicenseServer::unregisterClient(const std::shared_ptr<ConnectedClient> &client) {
{
lock_guard lock(this->lock);
lock_guard lock(this->client_lock);
auto it = find(this->currentClients.begin(), this->currentClients.end(), client);
if(it != this->currentClients.end())
this->currentClients.erase(it);
auto it = find(this->clients.begin(), this->clients.end(), client);
if(it != this->clients.end())
this->clients.erase(it);
}
client->protocol.state = protocol::UNCONNECTED;
{
std::lock_guard state_lock{client->protocol.state_lock};
client->protocol.state = protocol::UNCONNECTED;
}
client->uninit();
}
void LicenseServer::cleanup_clients() {
unique_lock lock(this->lock);
auto clients = this->currentClients;
unique_lock lock(this->client_lock);
auto clients = this->clients;
lock.unlock();
size_t cleanup_count{0};
for(const auto& client : clients) {
if(client->protocol.last_read + minutes(1) < system_clock::now()) {
cleanup_count++;
if(client->protocol.state != protocol::DISCONNECTING && client->protocol.state != protocol::UNCONNECTED) {
this->disconnectClient(client, "timeout");
this->closeConnection(client);
client->protocol.state = protocol::DISCONNECTING;
std::lock_guard state_lock{client->protocol.state_lock};
client->protocol.state = protocol::UNCONNECTED;
} else {
auto it = find(this->currentClients.begin(), this->currentClients.end(), client);
if(it != this->currentClients.end())
this->currentClients.erase(it);
auto it = find(this->clients.begin(), this->clients.end(), client);
if(it != this->clients.end())
this->clients.erase(it);
}
}
}
debugMessage("Client's cleaned up");
if(cleanup_count)
debugMessage(LOG_GENERAL, "{} clients have been cleaned up due to a read timeout.", cleanup_count);
}
std::shared_ptr<ConnectedClient> LicenseServer::findClient(int fileDescriptor) {
lock_guard lock(this->lock);
for(const auto& cl : this->currentClients)
if(cl->network.fileDescriptor == fileDescriptor)
std::shared_ptr<ConnectedClient> LicenseServer::findClient(int fd) {
lock_guard lock(this->client_lock);
for(const auto& cl : this->clients)
if(cl->network.fileDescriptor == fd)
return cl;
return nullptr;
}
#define ERR(message) \
do { \
logError(lstream << message); \
logError(LOG_GENERAL, message); \
this->closeConnection(client); \
return; \
} while(0)
@@ -374,7 +395,7 @@ void LicenseServer::handleMessage(shared_ptr<ConnectedClient>& client, const std
}
if(!success) {
logError("[CLIENT][" + client->address() + "] Failed to handle packet. message: " + error);
logError(LOG_GENERAL, "[CLIENT][" + client->address() + "] Failed to handle packet. message: " + error);
this->disconnectClient(client, error);
}
}