Fixed crash when client packet failed to decode

This commit is contained in:
WolverinDEV 2020-03-02 14:33:34 +01:00
parent d4bb3864e5
commit e470627d8f
5 changed files with 73 additions and 58 deletions

View File

@ -236,7 +236,7 @@ NAN_METHOD(ServerConnection::connect) {
return;
}
if(!callback->IsFunction() ) {
if(!callback->IsFunction()) {
Nan::ThrowError(tr("Invalid callback"));
return;
}
@ -304,6 +304,8 @@ NAN_METHOD(ServerConnection::connect) {
}
this->socket->on_data = [&](const pipes::buffer_view& buffer) { this->protocol_handler->progress_packet(buffer); };
this->socket->on_fatal_error = [&](int, int) { this->close_connection(); };
if(teamspeak->IsBoolean() && teamspeak->BooleanValue(info.GetIsolate()))
this->protocol_handler->server_type = server_type::TEAMSPEAK;
this->protocol_handler->connect();
@ -603,6 +605,7 @@ void ServerConnection::close_connection() {
if(this->protocol_handler)
this->protocol_handler->do_close_connection();
this->socket = nullptr;
this->call_disconnect_result.call(0, true);
}

View File

@ -83,15 +83,8 @@ void UDPSocket::finalize() {
lock.unlock();
assert(this_thread::get_id() != this->_io_thread.get_id());
if(event_read)
event_del_block(event_read);
if(event_write)
event_del_block(event_write);
if(event_write)
event_del(event_write);
if(event_read)
event_del(event_read);
if(event_read) event_del_block(event_read);
if(event_write) event_del_block(event_write);
if(io_base) {
timeval seconds{1, 0};
@ -136,7 +129,7 @@ void UDPSocket::io_execute() {
}
void UDPSocket::callback_read(evutil_socket_t fd) {
sockaddr source_address{};
socklen_t source_address_length = sizeof(sockaddr);
socklen_t source_address_length;
ssize_t read_length = -1;
size_t buffer_length = 1600; /* IPv6 MTU is ~1.5k */
@ -160,8 +153,15 @@ void UDPSocket::callback_read(evutil_socket_t fd) {
if(errno == EAGAIN)
break;
#endif
logger::warn(category::socket, tr("Failed to receive data: {}"), error);
if(auto callback{this->on_fatal_error}; callback)
callback(1, error);
logger::warn(category::socket, tr("Failed to receive data: {}"), error);
{
std::lock_guard lock{this->io_lock};
if(this->event_read)
event_del_noblock(this->event_read);
}
break; /* this should never happen! */
}
@ -183,7 +183,12 @@ void UDPSocket::callback_write(evutil_socket_t fd) {
auto written = sendto(fd, buffer.data_ptr<char>(), (int) buffer.length(), MSG_DONTWAIT, (sockaddr*) &this->_remote_address, sizeof(this->_remote_address));
if(written != buffer.length()) {
if(errno == EAGAIN) {
int error;
#ifdef WIN32
if(error = WSAGetLastError(); error == WSAEWOULDBLOCK) {
#else
if(error = errno; errno == EAGAIN) {
#endif
lock.lock();
this->write_queue.push_front(buffer);
if(this->event_write)
@ -191,6 +196,9 @@ void UDPSocket::callback_write(evutil_socket_t fd) {
return;
}
logger::warn(category::socket, tr("Failed to send data: {}"), error);
if(auto callback{this->on_fatal_error}; callback)
callback(2, error);
return; /* this should never happen! */
} else {
//logger::trace(category::socket, tr("Wrote {} bytes"), buffer.length());

View File

@ -15,44 +15,44 @@
#include <WinSock2.h>
#endif
namespace tc {
namespace connection {
class UDPSocket {
public:
UDPSocket(const sockaddr_storage& /* target */);
~UDPSocket();
namespace tc::connection {
class UDPSocket {
public:
explicit UDPSocket(const sockaddr_storage& /* target */);
~UDPSocket();
const sockaddr_storage& remote_address() { return this->_remote_address; }
const sockaddr_storage& remote_address() { return this->_remote_address; }
bool initialize();
void finalize();
bool initialize();
void finalize();
void send_message(const pipes::buffer_view& /* message */);
void send_message(const pipes::buffer_view& /* message */);
std::function<void(const pipes::buffer_view& /* message */)> on_data;
/* Callbacks are called within the IO loop. Do not call any other functions except send_message! */
std::function<void(const pipes::buffer_view& /* message */)> on_data;
std::function<void(int /* error code */, int /* description */)> on_fatal_error;
const std::thread& io_thread() { return this->_io_thread; }
private:
static void _io_execute(void *_ptr_socket);
static void _callback_read(evutil_socket_t, short, void*);
static void _callback_write(evutil_socket_t, short, void*);
const std::thread& io_thread() { return this->_io_thread; }
private:
static void _io_execute(void *_ptr_socket);
static void _callback_read(evutil_socket_t, short, void*);
static void _callback_write(evutil_socket_t, short, void*);
void io_execute();
void callback_read(evutil_socket_t);
void callback_write(evutil_socket_t);
void io_execute();
void callback_read(evutil_socket_t);
void callback_write(evutil_socket_t);
sockaddr_storage _remote_address;
sockaddr_storage _remote_address;
int file_descriptor = 0;
int file_descriptor = 0;
std::recursive_mutex io_lock;
std::thread _io_thread;
event_base* io_base = nullptr;
std::recursive_mutex io_lock;
std::thread _io_thread;
event_base* io_base = nullptr;
event* event_read = nullptr;
event* event_write = nullptr;
event* event_read = nullptr;
event* event_write = nullptr;
std::deque<pipes::buffer> write_queue;
};
}
std::deque<pipes::buffer> write_queue;
};
}

View File

@ -558,8 +558,12 @@ void VoiceClient::event_execute(const std::chrono::system_clock::time_point &sch
//TODO: Use statically allocated buffer?
auto decoded = this->decode_buffer(audio_codec.codec, replay_head->buffer);
this->output_source->enqueue_samples(decoded->sample_data, decoded->sample_size);
this->set_state(state::playing);
if(!decoded) {
log_warn(category::audio, tr("Failed to decode buffer for client {} (nullptr). Dropping buffer."), this->_client_id, error);
} else {
this->output_source->enqueue_samples(decoded->sample_data, decoded->sample_size);
this->set_state(state::playing);
}
}
local_last_pid = replay_head->packet_id;

View File

@ -59,11 +59,9 @@ connection.callback_disconnect = reason => {
const do_connect = () => {
connection.connect({
timeout: 5000,
remote_host: "51.255.107.151",
remote_port: 31515,
//remote_port: 9987,
remote_port: 9987,
//remote_host: "188.40.240.20", /* twerion */
//remote_host: "localhost",
remote_host: "127.0.0.1",
//remote_host: "ts.teaspeak.de",
//remote_host: "51.68.181.92",
//remote_host: "94.130.236.135",
@ -100,19 +98,11 @@ const do_connect = () => {
return_code:91
}
], []);
/*
consumer.callback_data = buffer => {
console.log("Sending voice data");
connection.send_voice_data_raw(buffer, consumer.channels, consumer.sample_rate, true);
//stream.write_data_rated(buffer.buffer, true, consumer.sample_rate);
};
*/
}
},
identity_key: "MG4DAgeAAgEgAiBC9JsqB1am6vowj2obomMyxm1GLk8qyRoxpBkAdiVYxwIgWksaSk7eyVQovZwPZBuiYHARz/xQD5zBUBK6e63V7hICIQCZ2glHe3kV62iIRKpkV2lzZGZtfBPRMbwIcU9aE1EVsg==",
teamspeak: true /* used here to speed up the handsahke process :) */
teamspeak: true /* used here to speed up the handshake process :) */
});
connection.callback_voice_data = (buffer, client_id, codec_id, flag_head, packet_id) => {
@ -120,9 +110,19 @@ const do_connect = () => {
connection.send_voice_data(buffer, codec_id, flag_head);
};
connection.callback_disconnect = error => {
console.log("Disconnect: %s", error);
};
connection.callback_command = (command, arguments1, switches) => {
console.log("Command: %s: %0", command, arguments1);
}
if(command === "channellistfinished") {
setInterval(() => {
console.log("XXX");
connection.send_command("channelcreate", [{"channel_name": Math.random() + " :D"}], []);
}, 10);
}
};
connection._voice_connection.register_client(7);
};