Some more updates

This commit is contained in:
WolverinDEV 2020-09-06 21:00:27 +02:00
parent 7d0db0dea0
commit 4f5a4dc993
16 changed files with 453 additions and 202 deletions

@ -1 +1 @@
Subproject commit b83a798e5348cbf916b84c1ab684adbadbaf9a71
Subproject commit 8fbf120f756d603da38029843a8844a743440bac

View File

@ -257,7 +257,7 @@ target_link_libraries(PermMapHelper
SET(CPACK_PACKAGE_VERSION_MAJOR "1")
SET(CPACK_PACKAGE_VERSION_MINOR "4")
SET(CPACK_PACKAGE_VERSION_PATCH "19")
SET(CPACK_PACKAGE_VERSION_PATCH "20")
if (BUILD_TYPE_NAME EQUAL OFF)
SET(CPACK_PACKAGE_VERSION_DATA "beta")
elseif (BUILD_TYPE_NAME STREQUAL "")

View File

@ -20,6 +20,9 @@ using namespace ts::protocol;
//#define PKT_LOG_VOICE
//#define PKT_LOG_WHISPER
constexpr static auto kMaxWhisperClientNameLength{30};
constexpr static auto kWhisperClientUniqueIdLength{28}; /* base64 encoded SHA1 hash */
constexpr static auto kWhisperMaxHeaderLength{2 + 2 + 1 + 2 + kWhisperClientUniqueIdLength + 1 + kMaxWhisperClientNameLength};
bool SpeakingClient::shouldReceiveVoice(const std::shared_ptr<ConnectedClient> &sender) {
//if(this->properties()[property::CLIENT_AWAY].as<bool>()) return false;
@ -150,42 +153,35 @@ inline bool update_whisper_error(std::chrono::system_clock::time_point& last) {
//Server group => type := SERVER_GROUP and target_id := <server group id>
//Channel group => type := CHANNEL_GROUP and target_id := <channel group id>
//Channel commander => type := CHANNEL_COMMANDER and target_id := 0
void SpeakingClient::handlePacketVoiceWhisper(const pipes::buffer_view& data, bool new_packet) {
if(data.length() < 5) {
void SpeakingClient::handlePacketVoiceWhisper(const pipes::buffer_view& payload, bool new_packet, bool head) {
if(payload.length() < 5) {
this->disconnect("Invalid packet (Voice whisper)");
logMessage(this->getServerId(), "{} Tried to send a too short whisper packet. Length: {}", CLIENT_STR_LOG_PREFIX, data.length());
logMessage(this->getServerId(), "{} Tried to send a too short whisper packet. Length: {}", CLIENT_STR_LOG_PREFIX, payload.length());
return;
}
uint16_t offset = 0;
auto vpacketId = be2le16((char*) data.data_ptr(), offset, &offset);
auto codec = (uint8_t) data[offset++];
uint16_t payload_offset{0};
auto voice_packet_id = be2le16((char*) payload.data_ptr(), payload_offset, &payload_offset);
auto voice_codec = (uint8_t) payload[payload_offset++];
VoicePacketFlags flags{};
flags.head = false;
flags.fragmented = false;
flags.new_protocol = new_packet;
std::deque<std::shared_ptr<SpeakingClient>> target_clients;
if(new_packet) {
if(data.length() < 7) {
if(payload.length() < 7) {
this->disconnect("Invalid packet (Voice whisper | New)");
logMessage(this->getServerId(), "{} Tried to send a too short whisper packet. Length: {}", CLIENT_STR_LOG_PREFIX, data.length());
logMessage(this->getServerId(), "{} Tried to send a too short whisper packet. Length: {}", CLIENT_STR_LOG_PREFIX, payload.length());
return;
}
auto type = (WhisperType) data[offset++];
auto target = (WhisperTarget) data[offset++];
auto type_id = be2le64((char*) data.data_ptr(), offset, &offset);
this->resetIdleTime();
auto type = (WhisperType) payload[payload_offset++];
auto target = (WhisperTarget) payload[payload_offset++];
auto type_id = be2le64((char*) payload.data_ptr(), payload_offset, &payload_offset);
size_t data_length = data.length() - offset;
#ifdef PKT_LOG_WHISPER
logTrace(this->getServerId(), "{} Whisper data length: {}. Type: {}. Target: {}. Target ID: {}.", CLIENT_STR_LOG_PREFIX, data_length, type, target, type_id);
#endif
deque<shared_ptr<SpeakingClient>> available_clients;
if(type == WhisperType::ECHO) {
available_clients.push_back(dynamic_pointer_cast<SpeakingClient>(this->ref()));
target_clients.push_back(dynamic_pointer_cast<SpeakingClient>(this->ref()));
} else {
for(const auto& client : this->server->getClients()) {
auto speakingClient = dynamic_pointer_cast<SpeakingClient>(client);
@ -193,84 +189,82 @@ void SpeakingClient::handlePacketVoiceWhisper(const pipes::buffer_view& data, bo
if(!speakingClient->currentChannel) continue;
if(type == WhisperType::ALL) {
available_clients.push_back(speakingClient);
target_clients.push_back(speakingClient);
} else if(type == WhisperType::SERVER_GROUP) {
if(type_id == 0)
available_clients.push_back(speakingClient);
target_clients.push_back(speakingClient);
else {
shared_lock client_lock(this->channel_lock);
for(const auto& id : client->cached_server_groups) {
if(id == type_id) {
available_clients.push_back(speakingClient);
target_clients.push_back(speakingClient);
break;
}
}
}
} else if(type == WhisperType::CHANNEL_GROUP) {
if(client->cached_channel_group == type_id)
available_clients.push_back(speakingClient);
target_clients.push_back(speakingClient);
} else if(type == WhisperType::CHANNEL_COMMANDER) {
if(client->properties()[property::CLIENT_IS_CHANNEL_COMMANDER].as<bool>())
available_clients.push_back(speakingClient);
target_clients.push_back(speakingClient);
}
}
if(target == WhisperTarget::CHANNEL_CURRENT) {
available_clients.erase(std::remove_if(available_clients.begin(), available_clients.end(), [&](const shared_ptr<SpeakingClient>& target) {
target_clients.erase(std::remove_if(target_clients.begin(), target_clients.end(), [&](const shared_ptr<SpeakingClient>& target) {
return target->currentChannel != this->currentChannel;
}), available_clients.end());
}), target_clients.end());
} else if(target == WhisperTarget::CHANNEL_PARENT) {
auto current_parent = this->currentChannel->parent();
if(!current_parent) return;
available_clients.erase(std::remove_if(available_clients.begin(), available_clients.end(), [&](const shared_ptr<SpeakingClient>& target) {
target_clients.erase(std::remove_if(target_clients.begin(), target_clients.end(), [&](const shared_ptr<SpeakingClient>& target) {
return target->currentChannel != current_parent;
}), available_clients.end());
}), target_clients.end());
} else if(target == WhisperTarget::CHANNEL_ALL_PARENT) {
shared_ptr<BasicChannel> current_parent;
{
current_parent = this->currentChannel->parent();
if(!current_parent) return;
}
available_clients.erase(std::remove_if(available_clients.begin(), available_clients.end(), [&](const shared_ptr<SpeakingClient>& target) {
target_clients.erase(std::remove_if(target_clients.begin(), target_clients.end(), [&](const shared_ptr<SpeakingClient>& target) {
auto tmp_parent = current_parent;
while(tmp_parent && tmp_parent != target->currentChannel)
tmp_parent = tmp_parent->parent();
return target->currentChannel != tmp_parent;
}), available_clients.end());
}), target_clients.end());
} else if(target == WhisperTarget::CHANNEL_FAMILY) {
shared_ptr<BasicChannel> current = this->currentChannel;
available_clients.erase(std::remove_if(available_clients.begin(), available_clients.end(), [&](const shared_ptr<SpeakingClient>& target) {
target_clients.erase(std::remove_if(target_clients.begin(), target_clients.end(), [&](const shared_ptr<SpeakingClient>& target) {
auto tmp_current = target->currentChannel;
while(tmp_current && tmp_current != current)
tmp_current = tmp_current->parent();
return current != tmp_current;
}), available_clients.end());
}), target_clients.end());
} else if(target == WhisperTarget::CHANNEL_COMPLETE_FAMILY) {
shared_ptr<BasicChannel> current = this->currentChannel;
while(current && current->parent()) current = current->parent();
available_clients.erase(std::remove_if(available_clients.begin(), available_clients.end(), [&](const shared_ptr<SpeakingClient>& target) {
target_clients.erase(std::remove_if(target_clients.begin(), target_clients.end(), [&](const shared_ptr<SpeakingClient>& target) {
auto tmp_current = target->currentChannel;
while(tmp_current && tmp_current != current)
tmp_current = tmp_current->parent();
return current != tmp_current;
}), available_clients.end());
}), target_clients.end());
} else if(target == WhisperTarget::CHANNEL_SUBCHANNELS) {
shared_ptr<BasicChannel> current = this->currentChannel;
available_clients.erase(std::remove_if(available_clients.begin(), available_clients.end(), [&](const shared_ptr<SpeakingClient>& target) {
target_clients.erase(std::remove_if(target_clients.begin(), target_clients.end(), [&](const shared_ptr<SpeakingClient>& target) {
return target->currentChannel->parent() != current;
}), available_clients.end());
}), target_clients.end());
}
auto self_lock = this->_this.lock();
available_clients.erase(std::remove_if(available_clients.begin(), available_clients.end(), [&](const std::shared_ptr<ConnectedClient>& cl) {
target_clients.erase(std::remove_if(target_clients.begin(), target_clients.end(), [&](const std::shared_ptr<ConnectedClient>& cl) {
auto speakingClient = dynamic_pointer_cast<SpeakingClient>(cl);
return !speakingClient->shouldReceiveVoiceWhisper(self_lock);
}), available_clients.end());
}), target_clients.end());
if(available_clients.empty()) {
if(target_clients.empty()) {
if(update_whisper_error(this->speak_last_no_whisper_target)) {
command_result result{error::whisper_no_targets};
this->notifyError(result);
@ -278,7 +272,7 @@ void SpeakingClient::handlePacketVoiceWhisper(const pipes::buffer_view& data, bo
return;
}
if(available_clients.size() > this->server->properties()[property::VIRTUALSERVER_MIN_CLIENTS_IN_CHANNEL_BEFORE_FORCED_SILENCE].as_save<size_t>()) {
if(target_clients.size() > this->server->properties()[property::VIRTUALSERVER_MIN_CLIENTS_IN_CHANNEL_BEFORE_FORCED_SILENCE].as_save<size_t>()) {
if(update_whisper_error(this->speak_last_too_many_whisper_targets)) {
command_result result{error::whisper_too_many_targets};
this->notifyError(result);
@ -286,103 +280,143 @@ void SpeakingClient::handlePacketVoiceWhisper(const pipes::buffer_view& data, bo
return;
}
}
//Create the packet data
char packet_buffer[OUT_WHISPER_PKT_OFFSET + data_length];
if(offset < data.length())
memcpy(&packet_buffer[OUT_WHISPER_PKT_OFFSET], &data[offset], data_length);
le2be16(vpacketId, packet_buffer, 0);
le2be16(this->getClientId(), packet_buffer, 2);
packet_buffer[4] = codec;
VoicePacketFlags flags{};
auto data = pipes::buffer_view(packet_buffer, OUT_WHISPER_PKT_OFFSET + data_length);
for(const auto& cl : available_clients){
cl->send_voice_whisper_packet(data, flags);
}
this->updateSpeak(false, system_clock::now());
} else {
auto clientCount = (uint8_t) data[offset++];
auto channelCount = (uint8_t) data[offset++];
if(data.length() < 5 + channelCount * 2 + clientCount * 8) {
logMessage(this->getServerId(), "{} Tried to send a too short whisper packet. Length: {} Required: {}", CLIENT_STR_LOG_PREFIX, data.length(), to_string(5 + channelCount * 2 + clientCount * 8));
auto channelCount = (uint8_t) payload[payload_offset++];
auto clientCount = (uint8_t) payload[payload_offset++];
if(payload.length() < 5 + clientCount * 2 + channelCount * 8) {
logMessage(this->getServerId(), "{} Tried to send a too short whisper packet. Length: {} Required: {}", CLIENT_STR_LOG_PREFIX, payload.length(), to_string(5 + channelCount * 2 + clientCount * 8));
return;
}
this->resetIdleTime();
ChannelId channelIds[clientCount];
ClientId clientIds[channelCount];
ChannelId channelIds[channelCount];
ClientId clientIds[clientCount];
for(uint8_t index = 0; index < clientCount; index++)
channelIds[index] = be2le64((char*) data.data_ptr(), offset, &offset);
for(uint8_t index = 0; index < channelCount; index++)
clientIds[index] = be2le16((char*) data.data_ptr(), offset, &offset);
auto available_clients = this->server->getClients();
available_clients.erase(std::remove_if(available_clients.begin(), available_clients.end(), [&](const std::shared_ptr<ConnectedClient>& cl) {
auto speakingClient = dynamic_pointer_cast<SpeakingClient>(cl);
if(!speakingClient || cl == this || !speakingClient->currentChannel) return true;
auto clientChannelId = cl->currentChannel->channelId();
auto clientId = cl->getClientId();
for(uint8_t index = 0; index < clientCount; index++)
if(channelIds[index] == clientChannelId) return false;
for(uint8_t index = 0; index < channelCount; index++)
if(clientIds[index] == clientId) return false;
return true;
}), available_clients.end());
auto self_lock = this->_this.lock();
available_clients.erase(std::remove_if(available_clients.begin(), available_clients.end(), [&](const std::shared_ptr<ConnectedClient>& cl) {
auto speakingClient = dynamic_pointer_cast<SpeakingClient>(cl);
return !speakingClient->shouldReceiveVoiceWhisper(self_lock);
}), available_clients.end());
if(available_clients.empty()) {
if(update_whisper_error(this->speak_last_no_whisper_target)) {
command_result result{error::whisper_no_targets};
this->notifyError(result);
}
return;
}
if(available_clients.size() > this->server->properties()[property::VIRTUALSERVER_MIN_CLIENTS_IN_CHANNEL_BEFORE_FORCED_SILENCE].as_save<size_t>()) {
if(update_whisper_error(this->speak_last_too_many_whisper_targets)) {
command_result result{error::whisper_too_many_targets};
this->notifyError(result);
}
return;
for(uint8_t index = 0; index < channelCount; index++) {
channelIds[index] = be2le64((char*) payload.data_ptr(), payload_offset, &payload_offset);
}
for(uint8_t index = 0; index < clientCount; index++) {
clientIds[index] = be2le16((char*) payload.data_ptr(), payload_offset, &payload_offset);
}
size_t dataLength = data.length() - offset;
#ifdef PKT_LOG_WHISPER
logTrace(this->getServerId(), "{} Whisper data length: {}. Client count: {}. Channel count: {}.", CLIENT_STR_LOG_PREFIX, dataLength, clientCount, channelCount);
#endif
for(const auto& client : this->server->getClients()) {
auto speaking_client = dynamic_pointer_cast<SpeakingClient>(client);
if(!speaking_client || client == this || !speaking_client->currentChannel)
continue;
auto clientChannelId = speaking_client->getChannelId();
auto clientId = speaking_client->getClientId();
for(uint8_t index = 0; index < channelCount; index++) {
if(channelIds[index] == clientChannelId) {
goto add_client;
}
}
for(uint8_t index = 0; index < clientCount; index++) {
if(clientIds[index] == clientId) {
goto add_client;
}
}
continue;
add_client:
if(!speaking_client->shouldReceiveVoiceWhisper(this->ref())) {
continue;
}
target_clients.push_back(speaking_client);
}
}
if(target_clients.empty()) {
if(update_whisper_error(this->speak_last_no_whisper_target)) {
command_result result{error::whisper_no_targets};
this->notifyError(result);
}
return;
}
if(target_clients.size() > this->server->properties()[property::VIRTUALSERVER_MIN_CLIENTS_IN_CHANNEL_BEFORE_FORCED_SILENCE].as_save<size_t>()) {
if(update_whisper_error(this->speak_last_too_many_whisper_targets)) {
command_result result{error::whisper_too_many_targets};
this->notifyError(result);
}
return;
}
/* send the packet */
{
size_t voice_payload_length = payload.length() - payload_offset;
//Create the packet data
char packetBuffer[OUT_WHISPER_PKT_OFFSET + dataLength];
if(offset < data.length())
memcpy(&packetBuffer[OUT_WHISPER_PKT_OFFSET], &data[offset], dataLength);
char whisper_packet_buffer[kWhisperMaxHeaderLength + voice_payload_length];
size_t whisper_packet_offset{0};
size_t whisper_packet_teamspeak_offset{0};
le2be16(vpacketId, packetBuffer, 0);
le2be16(this->getClientId(), packetBuffer, 2);
packetBuffer[4] = codec;
/* writing the teaspeak header */
if(head) {
auto uniqueId = this->getUid();
auto nickname = this->getDisplayName();
VoicePacketFlags flags{};
auto data = pipes::buffer_view(packetBuffer, OUT_WHISPER_PKT_OFFSET + dataLength);
if(uniqueId.length() > kWhisperClientUniqueIdLength) {
logCritical(LOG_GENERAL, "Clients unique id is longer than the expected max length of {}. Unique length: {}", kWhisperClientUniqueIdLength, uniqueId.length());
return;
}
for(const auto& cl : available_clients){ //Faster?
auto speakingClient = dynamic_pointer_cast<SpeakingClient>(cl);
assert(speakingClient);
if(speakingClient->shouldReceiveVoiceWhisper(_this.lock()))
speakingClient->send_voice_whisper_packet(data, flags);
if(nickname.length() > kMaxWhisperClientNameLength) {
logCritical(LOG_GENERAL, "Clients name is longer than the expected max length of {}. Name length: {}", kMaxWhisperClientNameLength, nickname.length());
return;
}
memset(whisper_packet_buffer + whisper_packet_offset, 0, kWhisperClientUniqueIdLength);
memcpy(whisper_packet_buffer + whisper_packet_offset, uniqueId.data(), uniqueId.length());
whisper_packet_offset += kWhisperClientUniqueIdLength;
whisper_packet_buffer[whisper_packet_offset++] = nickname.length();
memcpy(whisper_packet_buffer + whisper_packet_offset, nickname.data(), nickname.length());
whisper_packet_offset += nickname.length();
}
this->updateSpeak(false, system_clock::now());
/* writing the "normal" header and payload */
{
whisper_packet_teamspeak_offset = whisper_packet_offset;
*(uint16_t*) &whisper_packet_buffer[whisper_packet_offset] = htons(voice_packet_id);
whisper_packet_offset += 2;
*(uint16_t*) &whisper_packet_buffer[whisper_packet_offset] = htons(this->getClientId());
whisper_packet_offset += 2;
whisper_packet_buffer[whisper_packet_offset++] = voice_codec;
if(voice_payload_length > 0) {
memcpy(&whisper_packet_buffer[whisper_packet_offset], &payload[payload_offset], voice_payload_length);
whisper_packet_offset += voice_payload_length;
}
}
VoicePacketFlags flags{};
flags.head = head;
pipes::buffer_view teaspeak_packet{}, teamspeak_packet{};
teaspeak_packet = pipes::buffer_view{whisper_packet_buffer, whisper_packet_offset};
teamspeak_packet = pipes::buffer_view{whisper_packet_buffer + whisper_packet_teamspeak_offset, whisper_packet_offset - whisper_packet_teamspeak_offset};
for(const auto& cl : target_clients) {
if(cl->shouldReceiveVoiceWhisper(_this.lock())) {
cl->send_voice_whisper_packet(teamspeak_packet, teaspeak_packet, flags);
}
}
}
this->resetIdleTime();
this->updateSpeak(false, std::chrono::system_clock::now());
}
#define TEST_PARM(type) \

View File

@ -43,7 +43,7 @@ namespace ts::server {
//Whisper
bool shouldReceiveVoiceWhisper(const std::shared_ptr<ConnectedClient> &sender);
virtual void send_voice_whisper_packet(const pipes::buffer_view& /* voice packet data */, const VoicePacketFlags& /* flags */) = 0;
virtual void send_voice_whisper_packet(const pipes::buffer_view& /* teamspeak payload */, const pipes::buffer_view& /* teaspeak payload */, const VoicePacketFlags& /* flags */) = 0;
inline std::chrono::milliseconds takeSpokenTime() {
auto time = this->speak_time;
@ -62,7 +62,7 @@ namespace ts::server {
public:
void handlePacketVoice(const pipes::buffer_view&, bool head, bool fragmented);
virtual void handlePacketVoiceWhisper(const pipes::buffer_view&, bool /* new */);
virtual void handlePacketVoiceWhisper(const pipes::buffer_view&, bool /* new */, bool /* head */);
void processJoin();
void processLeave();

View File

@ -896,7 +896,7 @@ command_result ConnectedClient::handleCommandClientGetUidFromClid(Command &cmd)
bool found = false;
auto client_list = this->server->getClients();
Command notify(this->getExternalType() == CLIENT_TEAMSPEAK ? "notifyclientgetuidfromclid" : "");
Command notify(this->getExternalType() == CLIENT_TEAMSPEAK ? "notifyclientuidfromclid" : "");
int result_index = 0;
for(int index = 0; index < cmd.bulkCount(); index++) {

View File

@ -2819,6 +2819,7 @@ command_result ConnectedClient::handleCommandListFeatureSupport(ts::Command &cmd
REGISTER_FEATURE("error-bulks", FeatureSupportMode::FULL, 1);
REGISTER_FEATURE("advanced-channel-chat", FeatureSupportMode::FULL, 1);
REGISTER_FEATURE("log-query", FeatureSupportMode::FULL, 1);
REGISTER_FEATURE("whisper-echo", FeatureSupportMode::FULL, 1);
this->sendCommand(notify);
return command_result{error::ok};

View File

@ -257,14 +257,18 @@ void VoiceClient::send_voice_packet(const pipes::buffer_view &voice_buffer, cons
this->connection->send_packet(PacketType::VOICE, packet_flags, voice_buffer.data_ptr<void>(), voice_buffer.length());
}
void VoiceClient::send_voice_whisper_packet(const pipes::buffer_view &voice_buffer, const SpeakingClient::VoicePacketFlags &flags) {
void VoiceClient::send_voice_whisper_packet(const pipes::buffer_view &teamspeak_packet, const pipes::buffer_view &teaspeak_packet, const SpeakingClient::VoicePacketFlags &flags) {
PacketFlag::PacketFlags packet_flags{PacketFlag::None};
packet_flags |= flags.encrypted ? 0U : PacketFlag::Unencrypted;
packet_flags |= flags.head ? PacketFlag::Compressed : 0U;
packet_flags |= flags.fragmented ? PacketFlag::Fragmented : 0U;
packet_flags |= flags.new_protocol ? PacketFlag::NewProtocol : 0U;
this->connection->send_packet(PacketType::VOICE_WHISPER, packet_flags, voice_buffer.data_ptr<void>(), voice_buffer.length());
if(this->getType() == ClientType::CLIENT_TEASPEAK) {
this->connection->send_packet(PacketType::VOICE_WHISPER, packet_flags, teaspeak_packet.data_ptr<void>(), teaspeak_packet.length());
} else {
this->connection->send_packet(PacketType::VOICE_WHISPER, packet_flags, teamspeak_packet.data_ptr<void>(), teamspeak_packet.length());
}
}
float VoiceClient::current_ping_deviation() {

View File

@ -90,7 +90,11 @@ namespace ts {
void handlePacketCommand(const pipes::buffer_view&);
public:
void send_voice_packet(const pipes::buffer_view &packet, const VoicePacketFlags &flags) override;
void send_voice_whisper_packet(const pipes::buffer_view &packet, const VoicePacketFlags &flags) override;
void send_voice_whisper_packet(
const pipes::buffer_view &/* teamspeak packet */,
const pipes::buffer_view &/* teaspeak packet */,
const VoicePacketFlags &flags
) override;
protected:
virtual command_result handleCommand(Command &command) override;

View File

@ -35,7 +35,7 @@ void VoiceClientConnection::handlePacketVoiceWhisper(const ts::protocol::ClientP
auto client = this->getCurrentClient();
if(!client) return;
client->handlePacketVoiceWhisper(packet.payload(), (packet.flags() & PacketFlag::NewProtocol) > 0);
client->handlePacketVoiceWhisper(packet.payload(), (packet.flags() & PacketFlag::NewProtocol) > 0, (packet.flags() & PacketFlag::Compressed) > 0);
}
void VoiceClientConnection::handlePacketAck(const protocol::ClientPacketParser& packet) {

View File

@ -3,7 +3,6 @@
#include <misc/endianness.h>
#include <dlfcn.h>
#include "WebClient.h"
#include "VoiceBridge.h"
using namespace std;
using namespace ts;
@ -162,10 +161,10 @@ void VoiceBridge::handle_media_stream(const std::shared_ptr<rtc::Channel> &undef
} else if(undefined_stream->type() == rtc::CHANTYPE_AUDIO) {
auto stream = dynamic_pointer_cast<rtc::AudioChannel>(undefined_stream);
if(!stream) return;
this->_audio_channel = stream;
logTrace(this->server_id(), "Audio channel extensions:");
for(const auto& ex : stream->list_extensions()) {
debugMessage(0, "{} | {}", ex->name, ex->id);
logTrace(this->server_id(), " - {}: {}", ex->id, ex->name);
}
stream->register_local_extension("urn:ietf:params:rtp-hdrext:ssrc-audio-level");
@ -175,7 +174,20 @@ void VoiceBridge::handle_media_stream(const std::shared_ptr<rtc::Channel> &undef
break;
}
}
stream->incoming_data_handler = [&](const std::shared_ptr<rtc::MediaChannel> &channel, const pipes::buffer_view &data, size_t payload_offset) { this->handle_audio_data(channel, data, payload_offset); };
if(!this->incoming_voice_channel_.lock()) {
debugMessage(this->server_id(), "Having client's voice audio stream.");
this->incoming_voice_channel_ = stream;
stream->incoming_data_handler = [&](const std::shared_ptr<rtc::MediaChannel> &channel, const pipes::buffer_view &data, size_t payload_offset) {
this->handle_audio_voice_data(channel, data, payload_offset); };
} else if(!this->incoming_whisper_channel_.lock()) {
debugMessage(this->server_id(), "Having client's whispers audio stream.");
this->incoming_whisper_channel_ = stream;
stream->incoming_data_handler = [&](const std::shared_ptr<rtc::MediaChannel> &channel, const pipes::buffer_view &data, size_t payload_offset) {
this->handle_audio_voice_whisper_data(channel, data, payload_offset); };
} else {
debugMessage(this->server_id(), "Client sdp offer contains more than two voice channels.");
}
} else {
logError(this->server_id(), "Got offer for unknown channel of type {}", undefined_stream->type());
}
@ -192,7 +204,7 @@ void VoiceBridge::handle_data_channel(const std::shared_ptr<rtc::DataChannel> &c
if(buffer.length() < 2)
return;
this->callback_voice_data(buffer.view(2), buffer[0] == 1, buffer[1] == 1); /* buffer.substr(2), buffer[0] == 1, buffer[1] == 1 */
this->callback_voice_data(buffer.view(2), buffer[0] == 1);
};
channel->callback_close = [&, weak_channel] {
@ -212,7 +224,7 @@ void VoiceBridge::handle_data_channel(const std::shared_ptr<rtc::DataChannel> &c
if(buffer.length() < 1)
return;
this->callback_voice_whisper_data(buffer.view(2), buffer[0] == 1);
this->callback_voice_whisper_data(buffer.view(1), buffer[0] == 1);
};
channel->callback_close = [&, weak_channel] {
@ -225,31 +237,67 @@ void VoiceBridge::handle_data_channel(const std::shared_ptr<rtc::DataChannel> &c
}
}
void VoiceBridge::handle_audio_data(const std::shared_ptr<rtc::MediaChannel> &channel, const pipes::buffer_view &data, size_t payload_offset) {
void VoiceBridge::handle_audio_voice_data(const std::shared_ptr<rtc::MediaChannel> &channel, const pipes::buffer_view &data, size_t payload_offset) {
if(channel->codec->type != rtc::codec::Codec::OPUS) {
debugMessage(this->server_id(), "{} Got unknown codec ({})!", CLIENT_STR_LOG_PREFIX_(this->owner()), channel->codec->type);
//debugMessage(this->server_id(), "{} Got unknown codec ({})!", CLIENT_STR_LOG_PREFIX_(this->owner()), channel->codec->type);
return;
}
auto ac = _audio_channel.lock();
if(!ac) return;
this->handle_audio_voice_x_data(&this->voice_state, data, payload_offset);
}
for(const auto& ext : ac->list_extensions(rtc::direction::incoming)) {
void VoiceBridge::handle_audio_voice_whisper_data(const std::shared_ptr<rtc::MediaChannel> &channel, const pipes::buffer_view &data, size_t payload_offset) {
if(channel->codec->type != rtc::codec::Codec::OPUS) {
return;
}
this->handle_audio_voice_x_data(&this->whisper_state, data, payload_offset);
}
void VoiceBridge::handle_audio_voice_x_data(VoiceStateData *state, const pipes::buffer_view &data, size_t payload_offset) {
bool is_silence{false};
auto audio_channel = state->channel.lock();
if(!audio_channel) {
return;
}
for(const auto& ext : audio_channel->list_extensions(rtc::direction::incoming)) {
if(ext->name == "urn:ietf:params:rtp-hdrext:ssrc-audio-level") {
int level;
if(rtc::protocol::rtp_header_extension_parse_audio_level(data, ext->id, &level) == 0) {
//debugMessage(this->server_id(), "Audio level: {}", level);
if(level == 127) return; //Silence
if(level == 127) {
is_silence = true;
break;
}
}
break;
}
}
//int level;
//rtc::protocol::rtp_header_extension_parse_audio_level((char*) data.data(), data.length(), 1, &level);
auto target_buffer = buffer::allocate_buffer(data.length() - payload_offset + 3);
le2be16(this->voice.packet_id++, (char*) target_buffer.data_ptr());
target_buffer[2] = 5;
memcpy(&target_buffer[3], &data[payload_offset], data.length() - payload_offset);
this->callback_voice_data(target_buffer, this->voice.packet_id < 7, false);
if(is_silence) {
if(state->muted) {
/* the muted state is already set */
return;
}
state->muted = true;
auto target_buffer = buffer::allocate_buffer(3);
le2be16(state->sequence_packet_id++, (char*) target_buffer.data_ptr());
target_buffer[2] = 5;
state->callback(target_buffer, false);
} else {
if(state->muted) {
state->muted = false;
}
auto target_buffer = buffer::allocate_buffer(data.length() - payload_offset + 3);
le2be16(state->sequence_packet_id++, (char*) target_buffer.data_ptr());
target_buffer[2] = 5;
memcpy(&target_buffer[3], &data[payload_offset], data.length() - payload_offset);
state->callback(target_buffer, state->sequence_packet_id < 7);
}
}

View File

@ -11,8 +11,7 @@ namespace ts {
namespace web {
class VoiceBridge {
public:
typedef std::function<void(const pipes::buffer_view&, bool, bool)> cb_voice_data;
typedef std::function<void(const pipes::buffer_view&, bool)> cb_voice_whisper_data;
typedef std::function<void(const pipes::buffer_view&, bool /* is sequence start */)> cb_voice_data;
typedef std::function<void(const rtc::IceCandidate&)> cb_ice_candidate;
typedef std::function<void(const std::string& /* sdpMid */, int /* sdpMLineIndex */)> cb_ice_candidate_finish;
typedef std::function<void()> cb_initialized;
@ -33,12 +32,20 @@ namespace ts {
cb_ice_candidate callback_ice_candidate;
cb_ice_candidate_finish callback_ice_candidate_finished;
cb_voice_data callback_voice_data;
cb_voice_whisper_data callback_voice_whisper_data;
cb_voice_data callback_voice_whisper_data;
cb_initialized callback_initialized;
cb_failed callback_failed;
void execute_tick();
private:
struct VoiceStateData {
uint16_t sequence_packet_id{0};
bool muted{true};
std::weak_ptr<rtc::AudioChannel>& channel;
cb_voice_data& callback;
};
static void callback_log(void* ptr, pipes::Logger::LogLevel level, const std::string& name, const std::string& message, ...);
inline int server_id();
@ -46,20 +53,30 @@ namespace ts {
void handle_media_stream(const std::shared_ptr<rtc::Channel>& /* stream */);
void handle_data_channel(const std::shared_ptr<rtc::DataChannel> & /* channel */);
void handle_audio_data(const std::shared_ptr<rtc::MediaChannel>& /* channel */, const pipes::buffer_view& /* buffer */, size_t /* payload offset */);
void handle_audio_voice_data(const std::shared_ptr<rtc::MediaChannel>& /* channel */, const pipes::buffer_view& /* buffer */, size_t /* payload offset */);
void handle_audio_voice_whisper_data(const std::shared_ptr<rtc::MediaChannel>& /* channel */, const pipes::buffer_view& /* buffer */, size_t /* payload offset */);
static void handle_audio_voice_x_data(VoiceStateData* /* state */, const pipes::buffer_view& /* buffer */, size_t /* payload offset */);
std::weak_ptr<server::WebClient> _owner;
std::chrono::system_clock::time_point offer_timestamp;
std::unique_ptr<rtc::PeerConnection> connection;
std::shared_ptr<rtc::DataChannel> voice_channel_;
std::shared_ptr<rtc::DataChannel> voice_whisper_channel_;
std::shared_ptr<rtc::DataChannel> voice_channel_{};
std::shared_ptr<rtc::DataChannel> voice_whisper_channel_{};
std::weak_ptr<rtc::AudioChannel> _audio_channel;
struct {
uint16_t packet_id = 0;
bool muted = true;
} voice;
std::weak_ptr<rtc::AudioChannel> incoming_voice_channel_{};
std::weak_ptr<rtc::AudioChannel> incoming_whisper_channel_{};
VoiceStateData voice_state{
.channel = this->incoming_voice_channel_,
.callback = this->callback_voice_data
};
VoiceStateData whisper_state{
.channel = this->incoming_whisper_channel_,
.callback = this->callback_voice_whisper_data
};
};
}
}

View File

@ -12,7 +12,7 @@ using namespace ts::protocol;
void WebClient::handleMessageWrite(int fd, short, void *) {
auto self_lock = _this.lock();
unique_lock buffer_lock(this->queue_lock);
unique_lock buffer_lock(this->queue_mutex);
if(this->queue_write.empty()) return;
auto buffer = this->queue_write[0];
@ -85,7 +85,7 @@ void WebClient::handleMessageRead(int fd, short, void *) {
pbuffer.write(buffer, length);
{
lock_guard lock(this->queue_lock);
lock_guard lock(this->queue_mutex);
this->queue_read.push_back(std::move(pbuffer));
}
@ -95,7 +95,7 @@ void WebClient::handleMessageRead(int fd, short, void *) {
void WebClient::enqueue_raw_packet(const pipes::buffer_view &msg) {
auto buffer = msg.owns_buffer() ? msg.own_buffer() : msg.own_buffer(); /* TODO: Use buffer::allocate_buffer(...) */
{
lock_guard queue_lock(this->queue_lock);
lock_guard queue_lock(this->queue_mutex);
this->queue_write.push_back(buffer);
}
{
@ -124,11 +124,11 @@ inline bool is_ssl_handshake_header(const pipes::buffer_view& buffer) {
}
void WebClient::processNextMessage(const std::chrono::system_clock::time_point& /* scheduled */) {
lock_guard execute_lock(this->execute_lock);
lock_guard execute_lock(this->execute_mutex);
if(this->state != ConnectionState::INIT_HIGH && this->state != ConnectionState::INIT_LOW && this->state != ConnectionState::CONNECTED)
return;
unique_lock buffer_lock(this->queue_lock);
unique_lock buffer_lock(this->queue_mutex);
if(this->queue_read.empty())
return;
@ -152,6 +152,7 @@ void WebClient::processNextMessage(const std::chrono::system_clock::time_point&
this->ws_handler.process_incoming_data(buffer);
}
if(has_next)
if(has_next) {
this->registerMessageProcess();
}
}

View File

@ -220,7 +220,7 @@ bool WebClient::close_connection(const std::chrono::system_clock::time_point& ti
flag_flushed = true;
{
lock_guard lock(self_lock->queue_lock);
lock_guard lock(self_lock->queue_mutex);
flag_flushed &= self_lock->queue_read.empty();
flag_flushed &= self_lock->queue_write.empty();
}
@ -265,6 +265,11 @@ command_result WebClient::handleCommand(Command &command) {
return result;
}
}
if(command.command() == "setwhispertarget") {
return this->handleCommandSetWhisperTarget(command);
} else if(command.command() == "clearwhispertarget") {
return this->handleCommandClearWhisperTarget(command);
}
return SpeakingClient::handleCommand(command);
}
@ -333,7 +338,7 @@ void WebClient::onWSDisconnected(const string& error) {
void WebClient::onWSMessage(const pipes::WSMessage &message) {
if(message.code == pipes::OpCode::TEXT)
this->handleMessage(message.data.string());
this->handleMessage(message.data);
else if(message.code == pipes::OpCode::PING) {
logTrace(this->getServerId(), "{} Received ping on web socket. Application data length: {}. Sending pong", CLIENT_STR_LOG_PREFIX, message.data.length());
this->ws_handler.send({pipes::PONG, message.data});
@ -374,7 +379,7 @@ void WebClient::disconnectFinal() {
auto self_lock = this->_this.lock();
{
/* waiting to finish all executes */
lock_guard lock(this->execute_lock);
lock_guard lock(this->execute_mutex);
}
if(this->flush_thread.get_id() == this_thread::get_id())
@ -431,22 +436,24 @@ Json::CharReaderBuilder json_reader_builder = []() noexcept {
return reader_builder;
}();
void WebClient::handleMessage(const std::string &message) {
void WebClient::handleMessage(const pipes::buffer_view &message) {
/* Not really a need, this will directly be called via the ssl ws pipe, which has been triggered via progress message */
threads::MutexLock lock(this->execute_lock);
threads::MutexLock lock(this->execute_mutex);
Json::Value val;
try {
unique_ptr<Json::CharReader> reader{json_reader_builder.newCharReader()};
string error_message;
if(!reader->parse(message.data(),message.data() + message.length(), &val, &error_message))
if(!reader->parse(message.data_ptr<char>(),message.data_ptr<char>() + message.length(), &val, &error_message)) {
throw Json::Exception("Could not parse payload! (" + error_message + ")");
}
} catch (const std::exception& ex) {
logError(this->server->getServerId(), "Could not parse web message! Message: " + string(ex.what()));
logTrace(this->server->getServerId(), "Payload: " + message);
logError(this->server->getServerId(), "Could not parse web message! Message: {}", std::string{ex.what()});
logTrace(this->server->getServerId(), "Payload: {}", message.string());
return;
}
logTrace(this->server->getServerId(), "[{}] Read message {}", CLIENT_STR_LOG_PREFIX_(this), message);
logTrace(this->server->getServerId(), "[{}] Read message {}", CLIENT_STR_LOG_PREFIX_(this), std::string_view{message.data_ptr<char>(), message.length()});
try {
if(val["type"].isNull()) {
logError(this->server->getServerId(), "[{}] Invalid web json package!");
@ -478,15 +485,40 @@ void WebClient::handleMessage(const std::string &message) {
this->voice_bridge = make_unique<web::VoiceBridge>(dynamic_pointer_cast<WebClient>(this->ref())); //FIXME Add config
this->voice_bridge->callback_voice_data = [&](const pipes::buffer_view& buffer, bool a, bool b) {
this->voice_bridge->callback_voice_data = [&](const pipes::buffer_view& buffer, bool head) {
/* may somehow get the "real" packet size? */
this->connectionStatistics->logIncomingPacket(stats::ConnectionStatistics::category::VOICE, buffer.length());
this->handlePacketVoice(buffer, a, b);
this->handlePacketVoice(buffer, head, false);
};
this->voice_bridge->callback_voice_whisper_data = [&](const pipes::buffer_view& buffer, bool a) {
this->voice_bridge->callback_voice_whisper_data = [&](const pipes::buffer_view& buffer, bool head) {
/* may somehow get the "real" packet size? */
this->connectionStatistics->logIncomingPacket(stats::ConnectionStatistics::category::VOICE, buffer.length());
this->handlePacketVoiceWhisper(buffer, a);
constexpr static auto kTempBufferSize{2048};
char temp_buffer[kTempBufferSize];
size_t offset{0};
/* copy the voice header */
memcpy(temp_buffer, buffer.data_ptr(), 3);
offset += 3;
bool is_new;
{
std::lock_guard whisper_header_lock{this->whisper.mutex};
if(!this->whisper.is_set) {
return;
}
memcpy(temp_buffer + offset, this->whisper.target_header.data_ptr(), this->whisper.target_header.length());
offset += this->whisper.target_header.length();
is_new = this->whisper.is_new_header;
}
memcpy(temp_buffer + offset, buffer.data_ptr<char>() + 3, buffer.length() - 3);
offset += buffer.length() - 3;
this->handlePacketVoiceWhisper(pipes::buffer_view{temp_buffer, offset}, is_new, head);
};
this->voice_bridge->callback_initialized = [&](){
debugMessage(this->getServerId(), "{} Voice bridge initialized!", CLIENT_STR_LOG_PREFIX);
@ -697,16 +729,116 @@ void WebClient::send_voice_packet(const pipes::buffer_view &view, const Speaking
}
}
void WebClient::send_voice_whisper_packet(const pipes::buffer_view &view, const SpeakingClient::VoicePacketFlags &flags) {
std::shared_lock read_voice_bridge_lock(this->voice_bridge_lock);
void WebClient::send_voice_whisper_packet(const pipes::buffer_view &, const pipes::buffer_view &teaspeak_packet, const SpeakingClient::VoicePacketFlags &flags) {
std::shared_lock read_voice_bridge_lock{this->voice_bridge_lock};
if(this->voice_bridge) {
auto channel = this->voice_bridge->voice_whisper_channel();
if(channel) {
channel->send(view);
uint8_t buffer[teaspeak_packet.length() + 1];
memcpy(buffer + 1, teaspeak_packet.data_ptr(), teaspeak_packet.length());
buffer[0] = 0;
if(flags.head) {
buffer[0] |= 0x1U;
}
channel->send(pipes::buffer{buffer, teaspeak_packet.length() + 1});
read_voice_bridge_lock.unlock();
/* may somehow get the "real" packet size? */
this->connectionStatistics->logOutgoingPacket(stats::ConnectionStatistics::category::VOICE, view.length());
this->connectionStatistics->logOutgoingPacket(stats::ConnectionStatistics::category::VOICE, teaspeak_packet.length());
}
}
}
command_result WebClient::handleCommandSetWhisperTarget(Command &command) {
auto server = this->getServer();
if(!server) {
return command_result{error::server_unbound};
}
if(command.hasParm("new")) {
auto type = command["type"].as<uint8_t>();
auto target = command["target"].as<uint8_t>();
auto target_id = command["id"].as<uint64_t>();
std::lock_guard whisper_buffer_lock{this->whisper.mutex};
this->whisper.is_set = true;
this->whisper.is_new_header = true;
this->whisper.target_header.resize(10);
this->whisper.target_header[0] = type;
this->whisper.target_header[1] = target;
le2be64(target_id, &this->whisper.target_header[2]);
return command_result{error::ok};
} else {
if(command.bulkCount() > 255) {
return command_result{error::parameter_invalid_count};
}
std::vector<ClientId> client_ids{};
std::vector<ChannelId> channel_ids{};
client_ids.reserve(command.bulkCount());
channel_ids.reserve(command.bulkCount());
std::optional<decltype(server->getClients())> server_clients{};
for(size_t bulk{0}; bulk < command.bulkCount(); bulk++) {
if(command[bulk].has("cid")) {
channel_ids.push_back(command[bulk]["cid"]);
}
if(command[bulk].has("clid")) {
channel_ids.push_back(command[bulk]["clid"]);
}
if(command[bulk].has("cluid")) {
auto client_unique_id = command[bulk]["cluid"].string();
if(!server_clients.has_value()) {
server_clients = server->getClients();
}
for(const auto& client : *server_clients) {
if(client->getUid() == client_unique_id) {
client_ids.push_back(client->getClientId());
}
}
}
}
/* check if we're exceeding the protocol limit */
if(client_ids.size() > 255) {
return command_result{error::whisper_too_many_targets};
}
if(channel_ids.size() > 255) {
return command_result{error::whisper_too_many_targets};
}
/* generate the whisper target header */
std::lock_guard whisper_buffer_lock{this->whisper.mutex};
this->whisper.is_set = true;
this->whisper.is_new_header = false;
this->whisper.target_header.resize(client_ids.size() * 2 + channel_ids.size() * 8 + 2);
static_assert(sizeof(ChannelId) == 8);
static_assert(sizeof(ClientId) == 2);
size_t offset{0};
this->whisper.target_header[0] = channel_ids.size();
this->whisper.target_header[1] = client_ids.size();
offset += 2;
memcpy(this->whisper.target_header.data_ptr<char>() + offset, channel_ids.data(), channel_ids.size() * 8);
offset += channel_ids.size() * 8;
memcpy(this->whisper.target_header.data_ptr<char>() + offset, client_ids.data(), client_ids.size() * 2);
//offset += channel_ids.size() * 2;
}
return command_result{error::ok};
}
command_result WebClient::handleCommandClearWhisperTarget(Command &command) {
std::lock_guard whisper_buffer_lock{this->whisper.mutex};
this->whisper.is_set = false;
return command_result{error::ok};
}

View File

@ -33,9 +33,9 @@ namespace ts::server {
bool shouldReceiveVoice(const std::shared_ptr<ConnectedClient> &sender) override;
inline std::chrono::nanoseconds client_ping() { return this->client_ping_layer_7(); }
inline std::chrono::nanoseconds client_ping_layer_5() { return this->ping.value; }
inline std::chrono::nanoseconds client_ping_layer_7() { return this->js_ping.value; }
[[nodiscard]] inline std::chrono::nanoseconds client_ping() const { return this->client_ping_layer_7(); }
[[nodiscard]] inline std::chrono::nanoseconds client_ping_layer_5() const { return this->ping.value; }
[[nodiscard]] inline std::chrono::nanoseconds client_ping_layer_7() const { return this->js_ping.value; }
protected:
void tick(const std::chrono::system_clock::time_point&) override; /* Every 500ms */
@ -43,18 +43,23 @@ namespace ts::server {
void applySelfLock(const std::shared_ptr<WebClient> &cl){ _this = cl; }
private:
WebControlServer* handle;
std::chrono::time_point<std::chrono::system_clock> connectedTimestamp;
std::shared_mutex voice_bridge_lock;
std::unique_ptr<web::VoiceBridge> voice_bridge;
int file_descriptor;
bool allow_raw_commands{false};
bool ssl_detected{false};
bool ssl_encrypted{true};
pipes::SSL ssl_handler;
pipes::WebSocket ws_handler;
std::mutex event_lock;
::event* readEvent;
::event* writeEvent;
struct {
uint8_t current_id = 0;
uint8_t current_id{0};
std::chrono::system_clock::time_point last_request;
std::chrono::system_clock::time_point last_response;
@ -63,7 +68,7 @@ namespace ts::server {
} ping;
struct {
uint8_t current_id = 0;
uint8_t current_id{0};
std::chrono::system_clock::time_point last_request;
std::chrono::system_clock::time_point last_response;
@ -71,21 +76,23 @@ namespace ts::server {
std::chrono::nanoseconds timeout{2000};
} js_ping;
std::mutex queue_lock;
std::mutex queue_mutex;
std::deque<pipes::buffer> queue_read;
std::deque<pipes::buffer> queue_write;
threads::Mutex execute_lock; /* needs to be recursive! */
threads::Mutex execute_mutex; /* needs to be recursive! */
std::thread flush_thread;
std::recursive_mutex close_lock;
struct {
std::mutex mutex{};
pipes::buffer target_header{};
bool is_new_header{false};
bool is_set{false};
} whisper;
private:
void initialize();
bool allow_raw_commands{false};
bool ssl_detected{false};
bool ssl_encrypted{true};
pipes::SSL ssl_handler;
pipes::WebSocket ws_handler;
void handleMessageRead(int, short, void*);
void handleMessageWrite(int, short, void*);
void enqueue_raw_packet(const pipes::buffer_view& /* buffer */);
@ -101,16 +108,19 @@ namespace ts::server {
void onWSMessage(const pipes::WSMessage&);
protected:
void disconnectFinal();
void handleMessage(const std::string &);
void handleMessage(const pipes::buffer_view&);
public:
void send_voice_packet(const pipes::buffer_view &view, const VoicePacketFlags &flags) override;
void send_voice_whisper_packet(const pipes::buffer_view &view, const VoicePacketFlags &flags) override;
void send_voice_whisper_packet(const pipes::buffer_view &/* teamspeak packet */, const pipes::buffer_view &/* teaspeak packet */, const VoicePacketFlags &flags) override;
protected:
command_result handleCommand(Command &command) override;
command_result handleCommandClientInit(Command &command) override;
command_result handleCommandSetWhisperTarget(Command &command);
command_result handleCommandClearWhisperTarget(Command &command);
};
}
#endif

View File

@ -273,7 +273,7 @@ void VoiceServer::handleMessageRead(int fd, short events, void *_event_handle) {
if((message.msg_flags & MSG_TRUNC) > 0)
logError(ts_server->getServerId(), "Received truncated message from {}", net::to_string(remote_address));
if(bytes_read < 0){
if(bytes_read < 0) {
if(errno == EAGAIN)
break;
//Nothing more to read

2
shared

@ -1 +1 @@
Subproject commit 4d7fabe2eae09068e0c3bb47e1b0d5d08df11e45
Subproject commit 0a960e414811bae5081b45219aad97f6cff5c512