#include "Socket.h" #include "../logger.h" #include #include #include #include #ifdef WIN32 #include typedef int socklen_t; #else #include #include #endif using namespace std; using namespace tc::connection; UDPSocket::UDPSocket(const sockaddr_storage &address) { memcpy(&this->_remote_address, &address, sizeof(sockaddr_storage)); } UDPSocket::~UDPSocket() { this->finalize(); } bool UDPSocket::initialize() { if(this->file_descriptor > 0) return false; this->file_descriptor = socket(this->_remote_address.ss_family, SOCK_DGRAM, 0); if(this->file_descriptor < 2) { this->file_descriptor = 0; return false; } /* * TODO: Make configurable */ //uint8_t value = IPTOS_DSCP_EF; //if(setsockopt(this->file_descriptor, IPPROTO_IP, IP_TOS, &value, sizeof(value)) < 0) // log_warn(category::connection, "Failed to set TOS high priority on socket"); this->io_base = event_base_new(); if(!this->io_base) { /* may too many file descriptors already open */ this->finalize(); return false; } this->event_read = event_new(this->io_base, this->file_descriptor, EV_READ | EV_PERSIST, &UDPSocket::_callback_read, this); this->event_write = event_new(this->io_base, this->file_descriptor, EV_WRITE, &UDPSocket::_callback_write, this); event_add(this->event_read, nullptr); this->_io_thread = thread(&UDPSocket::_io_execute, this); #ifdef WIN32 //TODO set thread name #else auto handle = this->_io_thread.native_handle(); pthread_setname_np(handle, "UDPSocket loop"); #endif return true; } void UDPSocket::finalize() { if(this->file_descriptor == 0) return; unique_lock lock(this->io_lock); auto event_read = this->event_read, event_write = this->event_write; auto io_base = this->io_base; this->io_base = nullptr; this->event_read = nullptr; this->event_write = nullptr; lock.unlock(); assert(this_thread::get_id() != this->_io_thread.get_id()); if(event_read) event_del_block(event_read); if(event_write) event_del_block(event_write); if(event_write) event_del(event_write); if(event_read) event_del(event_read); if(io_base) { timeval seconds{1, 0}; event_base_loopexit(io_base, &seconds); event_base_loopexit(io_base, nullptr); } if(this->_io_thread.joinable()) this->_io_thread.join(); if(io_base) event_base_free(io_base); #ifdef WIN32 if(::closesocket(this->file_descriptor) != 0) { #else if(::close(this->file_descriptor) != 0) { #endif if(errno != EBADF) logger::warn(category::socket, tr("Failed to close file descriptor ({}/{})"), to_string(errno), strerror(errno)); } this->file_descriptor = 0; } void UDPSocket::_callback_write(evutil_socket_t fd, short, void *_ptr_socket) { ((UDPSocket*) _ptr_socket)->callback_write(fd); } void UDPSocket::_callback_read(evutil_socket_t fd, short, void *_ptr_socket) { ((UDPSocket*) _ptr_socket)->callback_read(fd); } void UDPSocket::_io_execute(void *_ptr_socket) { ((UDPSocket*) _ptr_socket)->io_execute(); } void UDPSocket::io_execute() { while(this->io_base) { event_base_loop(this->io_base, 0); } } void UDPSocket::callback_read(evutil_socket_t fd) { sockaddr source_address{}; socklen_t source_address_length = sizeof(sockaddr); ssize_t buffer_length = 1600; /* IPv6 MTU is ~1.5k */ char buffer[1600]; buffer_length = recvfrom(fd, (char*) buffer, buffer_length, 0, &source_address, &source_address_length); if(buffer_length <= 0) { if(errno == EAGAIN) return; logger::warn(category::socket, tr("Failed to receive data: {}/{}"), errno, strerror(errno)); return; /* this should never happen! */ } if(this->on_data) this->on_data(pipes::buffer_view{buffer, (size_t) buffer_length}); } void UDPSocket::callback_write(evutil_socket_t fd) { unique_lock lock(this->io_lock); if(this->write_queue.empty()) return; auto buffer = this->write_queue.front(); this->write_queue.pop_front(); lock.unlock(); auto written = sendto(fd, buffer.data_ptr(), buffer.length(), 0, (sockaddr*) &this->_remote_address, sizeof(this->_remote_address)); if(written != buffer.length()) { if(errno == EAGAIN) { lock.lock(); this->write_queue.push_front(buffer); if(this->event_write) event_add(this->event_write, nullptr); return; } return; /* this should never happen! */ } lock.lock(); if(!this->write_queue.empty() && this->event_write) event_add(this->event_write, nullptr); } void UDPSocket::send_message(const pipes::buffer_view &buffer) { auto buf = buffer.own_buffer(); unique_lock lock(this->io_lock); this->write_queue.push_back(buf); if(this->event_write) event_add(this->event_write, nullptr); }