Some changes

This commit is contained in:
WolverinDEV
2020-08-13 12:58:19 +02:00
parent 7fdd272d76
commit 7dcf4a54ef
13 changed files with 310 additions and 225 deletions
+39 -18
View File
@@ -145,7 +145,7 @@ std::string VoiceBridge::generate_answer() {
}
void VoiceBridge::execute_tick() {
if(!this->_voice_channel) {
if(!this->voice_channel_) {
if(this->offer_timestamp.time_since_epoch().count() > 0 && this->offer_timestamp + chrono::seconds{20} < chrono::system_clock::now()) {
this->offer_timestamp = chrono::system_clock::time_point();
this->connection->callback_setup_fail(rtc::PeerConnection::ConnectionComponent::BASE, "setup timeout");
@@ -182,26 +182,47 @@ void VoiceBridge::handle_media_stream(const std::shared_ptr<rtc::Channel> &undef
}
void VoiceBridge::handle_data_channel(const std::shared_ptr<rtc::DataChannel> &channel) {
if(channel->lable() == "main") {
this->_voice_channel = channel;
if(channel->lable() == "main" || channel->lable() == "voice") {
this->voice_channel_ = channel;
debugMessage(this->server_id(), "{} Got voice channel!", CLIENT_STR_LOG_PREFIX_(this->owner()));
this->callback_initialized();
weak_ptr<rtc::DataChannel> weak_channel = channel;
channel->callback_binary = [&, weak_channel](const pipes::buffer_view& buffer) {
if(buffer.length() < 2)
return;
this->callback_voice_data(buffer.view(2), buffer[0] == 1, buffer[1] == 1); /* buffer.substr(2), buffer[0] == 1, buffer[1] == 1 */
};
channel->callback_close = [&, weak_channel] {
auto channel_ref = weak_channel.lock();
if(channel_ref == this->voice_channel_) {
this->voice_channel_ = nullptr;
//TODO may callback?
debugMessage(this->server_id(), "{} Voice channel disconnected!", CLIENT_STR_LOG_PREFIX_(this->owner()));
}
};
} else if(channel->lable() == "voice-whisper") {
this->voice_whisper_channel_ = channel;
debugMessage(this->server_id(), "{} Got voice whisper channel", CLIENT_STR_LOG_PREFIX_(this->owner()));
weak_ptr<rtc::DataChannel> weak_channel = channel;
channel->callback_binary = [&, weak_channel](const pipes::buffer_view& buffer) {
if(buffer.length() < 1)
return;
this->callback_voice_whisper_data(buffer.view(2), buffer[0] == 1);
};
channel->callback_close = [&, weak_channel] {
auto channel_ref = weak_channel.lock();
if(channel_ref == this->voice_whisper_channel_) {
this->voice_whisper_channel_ = nullptr;
debugMessage(this->server_id(), "{} Voice whisper channel has been closed.", CLIENT_STR_LOG_PREFIX_(this->owner()));
}
};
}
weak_ptr<rtc::DataChannel> weak_channel = channel;
channel->callback_binary = [&, weak_channel](const pipes::buffer_view& buffer) {
if(buffer.length() < 2)
return;
this->callback_voice_data(buffer.view(2), buffer[0] == 1, buffer[1] == 1); /* buffer.substr(2), buffer[0] == 1, buffer[1] == 1 */
};
channel->callback_close = [&, channel] {
if(channel == this->_voice_channel) {
this->_voice_channel = nullptr;
//TODO may callback?
debugMessage(this->server_id(), "{} Voice channel disconnected!", CLIENT_STR_LOG_PREFIX_(this->owner()));
}
};
}
void VoiceBridge::handle_audio_data(const std::shared_ptr<rtc::MediaChannel> &channel, const pipes::buffer_view &data, size_t payload_offset) {
+8 -2
View File
@@ -12,12 +12,14 @@ namespace ts {
class VoiceBridge {
public:
typedef std::function<void(const pipes::buffer_view&, bool, bool)> cb_voice_data;
typedef std::function<void(const pipes::buffer_view&, bool)> cb_voice_whisper_data;
typedef std::function<void(const rtc::IceCandidate&)> cb_ice_candidate;
typedef std::function<void(const std::string& /* sdpMid */, int /* sdpMLineIndex */)> cb_ice_candidate_finish;
typedef std::function<void()> cb_initialized;
typedef std::function<void()> cb_failed;
std::shared_ptr<rtc::DataChannel> voice_channel() { return this->_voice_channel; }
std::shared_ptr<rtc::DataChannel> voice_channel() { return this->voice_channel_; }
std::shared_ptr<rtc::DataChannel> voice_whisper_channel() { return this->voice_whisper_channel_; }
explicit VoiceBridge(const std::shared_ptr<server::WebClient>&);
virtual ~VoiceBridge();
@@ -31,6 +33,7 @@ namespace ts {
cb_ice_candidate callback_ice_candidate;
cb_ice_candidate_finish callback_ice_candidate_finished;
cb_voice_data callback_voice_data;
cb_voice_whisper_data callback_voice_whisper_data;
cb_initialized callback_initialized;
cb_failed callback_failed;
@@ -48,7 +51,10 @@ namespace ts {
std::weak_ptr<server::WebClient> _owner;
std::chrono::system_clock::time_point offer_timestamp;
std::unique_ptr<rtc::PeerConnection> connection;
std::shared_ptr<rtc::DataChannel> _voice_channel;
std::shared_ptr<rtc::DataChannel> voice_channel_;
std::shared_ptr<rtc::DataChannel> voice_whisper_channel_;
std::weak_ptr<rtc::AudioChannel> _audio_channel;
struct {
uint16_t packet_id = 0;
+49 -28
View File
@@ -156,22 +156,29 @@ void WebClient::sendJson(const Json::Value& json) {
}
void WebClient::sendCommand(const ts::Command &command, bool low) {
Json::Value value = command.buildJson();
value["type"] = "command";
this->sendJson(value);
if(this->allow_raw_commands) {
Json::Value value{};
value["type"] = "command-raw";
value["payload"] = command.build();
this->sendJson(value);
} else {
Json::Value value = command.buildJson();
value["type"] = "command";
this->sendJson(value);
}
}
void WebClient::sendCommand(const ts::command_builder &command, bool low) {
#if false
Json::Value value{};
value["type"] = "command2";
value["payload"] = command.build();
this->sendJson(value);
#else
auto data = command.build();
Command parsed_command = Command::parse(pipes::buffer_view{data.data(), data.length()}, true, false);
this->sendCommand(parsed_command, low);
#endif
if(this->allow_raw_commands) {
Json::Value value{};
value["type"] = "command-raw";
value["payload"] = command.build();
this->sendJson(value);
} else {
auto data = command.build();
Command parsed_command = Command::parse(pipes::buffer_view{data.data(), data.length()}, true, false);
this->sendCommand(parsed_command, low);
}
}
bool WebClient::close_connection(const std::chrono::system_clock::time_point& timeout) {
@@ -418,7 +425,7 @@ void WebClient::disconnectFinal() {
this->handle->unregisterConnection(static_pointer_cast<WebClient>(self_lock));
}
Json::CharReaderBuilder json_reader_builder = []{
Json::CharReaderBuilder json_reader_builder = []() noexcept {
Json::CharReaderBuilder reader_builder;
return reader_builder;
@@ -432,7 +439,8 @@ void WebClient::handleMessage(const std::string &message) {
unique_ptr<Json::CharReader> reader{json_reader_builder.newCharReader()};
string error_message;
if(!reader->parse(message.data(),message.data() + message.length(), &val, &error_message)) throw Json::Exception("Could not parse payload! (" + error_message + ")");
if(!reader->parse(message.data(),message.data() + message.length(), &val, &error_message))
throw Json::Exception("Could not parse payload! (" + error_message + ")");
} catch (const std::exception& ex) {
logError(this->server->getServerId(), "Could not parse web message! Message: " + string(ex.what()));
logTrace(this->server->getServerId(), "Payload: " + message);
@@ -458,7 +466,7 @@ void WebClient::handleMessage(const std::string &message) {
} else if(val["type"].asString() == "WebRTC") {
auto subType = val["request"].asString();
if(subType == "create") {
unique_lock voice_bridge_lock(this->voice_bridge_lock);
std::unique_lock voice_bridge_lock_{this->voice_bridge_lock};
if(this->voice_bridge) {
logError(this->server->getServerId(), "[{}] Tried to register a WebRTC channel twice!", CLIENT_STR_LOG_PREFIX_(this));
@@ -467,14 +475,19 @@ void WebClient::handleMessage(const std::string &message) {
lock = nullptr;
}).detach();
}
//TODO test if bridge already exists!
this->voice_bridge = make_unique<web::VoiceBridge>(dynamic_pointer_cast<WebClient>(_this.lock())); //FIXME Add config
this->voice_bridge = make_unique<web::VoiceBridge>(dynamic_pointer_cast<WebClient>(this->ref())); //FIXME Add config
this->voice_bridge->callback_voice_data = [&](const pipes::buffer_view& buffer, bool a, bool b) {
/* may somehow get the "real" packet size? */
this->connectionStatistics->logIncomingPacket(stats::ConnectionStatistics::category::VOICE, buffer.length());
this->handlePacketVoice(buffer, a, b);
};
this->voice_bridge->callback_voice_whisper_data = [&](const pipes::buffer_view& buffer, bool a) {
/* may somehow get the "real" packet size? */
this->connectionStatistics->logIncomingPacket(stats::ConnectionStatistics::category::VOICE, buffer.length());
this->handlePacketVoiceWhisper(buffer, a);
};
this->voice_bridge->callback_initialized = [&](){
debugMessage(this->getServerId(), "{} Voice bridge initialized!", CLIENT_STR_LOG_PREFIX);
};
@@ -521,7 +534,7 @@ void WebClient::handleMessage(const std::string &message) {
};
auto vbp = &*this->voice_bridge;
voice_bridge_lock.unlock();
voice_bridge_lock_.unlock();
shared_lock read_voice_bridge_lock(this->voice_bridge_lock);
if(vbp != &*this->voice_bridge) {
@@ -628,6 +641,8 @@ void WebClient::handleMessage(const std::string &message) {
}
this->js_ping.last_response = system_clock::now();
this->js_ping.value = duration_cast<nanoseconds>(this->js_ping.last_response - this->js_ping.last_request);
} else if(val["type"].asString() == "enable-raw-commands") {
this->allow_raw_commands = true;
}
} catch (const std::exception& ex) {
logError(this->server->getServerId(), "Could not handle json packet! Message {}", ex.what());
@@ -664,21 +679,18 @@ command_result WebClient::handleCommandClientInit(Command &command) {
bool WebClient::shouldReceiveVoice(const std::shared_ptr<ConnectedClient> &sender) {
shared_lock read_voice_bridge_lock(this->voice_bridge_lock);
if(!this->voice_bridge || !this->voice_bridge->voice_channel()) return false;
return SpeakingClient::shouldReceiveVoice(sender);
}
void WebClient::handlePacketVoiceWhisper(const pipes::buffer_view &string, bool flag) {
shared_lock read_voice_bridge_lock(this->voice_bridge_lock);
if(!this->voice_bridge || !this->voice_bridge->voice_channel()) return;
SpeakingClient::handlePacketVoiceWhisper(string, flag);
}
void WebClient::send_voice_packet(const pipes::buffer_view &view, const SpeakingClient::VoicePacketFlags &flags) {
shared_lock read_voice_bridge_lock(this->voice_bridge_lock);
std::shared_lock read_voice_bridge_lock(this->voice_bridge_lock);
if(this->voice_bridge) {
auto channel = this->voice_bridge->voice_channel();
if(channel) {
channel->send(view);
read_voice_bridge_lock.unlock();
/* may somehow get the "real" packet size? */
this->connectionStatistics->logOutgoingPacket(stats::ConnectionStatistics::category::VOICE, view.length());
}
@@ -686,6 +698,15 @@ void WebClient::send_voice_packet(const pipes::buffer_view &view, const Speaking
}
void WebClient::send_voice_whisper_packet(const pipes::buffer_view &view, const SpeakingClient::VoicePacketFlags &flags) {
logError(this->server->getServerId(), "Web client got whisper packet");
//As well log the data!
std::shared_lock read_voice_bridge_lock(this->voice_bridge_lock);
if(this->voice_bridge) {
auto channel = this->voice_bridge->voice_whisper_channel();
if(channel) {
channel->send(view);
read_voice_bridge_lock.unlock();
/* may somehow get the "real" packet size? */
this->connectionStatistics->logOutgoingPacket(stats::ConnectionStatistics::category::VOICE, view.length());
}
}
}
+76 -78
View File
@@ -14,105 +14,103 @@
#include <json/json.h>
#include <EventLoop.h>
namespace ts {
namespace server {
class WebControlServer;
namespace ts::server {
class WebControlServer;
class WebClient : public SpeakingClient {
friend class WebControlServer;
public:
WebClient(WebControlServer*, int socketFd);
~WebClient() override;
class WebClient : public SpeakingClient {
friend class WebControlServer;
public:
WebClient(WebControlServer*, int socketFd);
~WebClient() override;
void sendJson(const Json::Value&);
void sendCommand(const ts::Command &command, bool low = false) override;
void sendCommand(const ts::command_builder &command, bool low) override;
void sendJson(const Json::Value&);
void sendCommand(const ts::Command &command, bool low) override;
void sendCommand(const ts::command_builder &command, bool low) override;
bool disconnect(const std::string &reason) override;
bool close_connection(const std::chrono::system_clock::time_point& timeout = std::chrono::system_clock::time_point()) override;
bool disconnect(const std::string &reason) override;
bool close_connection(const std::chrono::system_clock::time_point& timeout = std::chrono::system_clock::time_point()) override;
bool shouldReceiveVoice(const std::shared_ptr<ConnectedClient> &sender) override;
bool shouldReceiveVoice(const std::shared_ptr<ConnectedClient> &sender) override;
inline std::chrono::nanoseconds client_ping() { return this->client_ping_layer_7(); }
inline std::chrono::nanoseconds client_ping_layer_5() { return this->ping.value; }
inline std::chrono::nanoseconds client_ping_layer_7() { return this->js_ping.value; }
protected:
void handlePacketVoiceWhisper(const pipes::buffer_view &string, bool) override;
protected:
void tick(const std::chrono::system_clock::time_point&) override; /* Every 500ms */
inline std::chrono::nanoseconds client_ping() { return this->client_ping_layer_7(); }
inline std::chrono::nanoseconds client_ping_layer_5() { return this->ping.value; }
inline std::chrono::nanoseconds client_ping_layer_7() { return this->js_ping.value; }
void applySelfLock(const std::shared_ptr<WebClient> &cl){ _this = cl; }
private:
WebControlServer* handle;
std::chrono::time_point<std::chrono::system_clock> connectedTimestamp;
protected:
void tick(const std::chrono::system_clock::time_point&) override; /* Every 500ms */
std::shared_mutex voice_bridge_lock;
std::unique_ptr<web::VoiceBridge> voice_bridge;
int file_descriptor;
void applySelfLock(const std::shared_ptr<WebClient> &cl){ _this = cl; }
private:
WebControlServer* handle;
std::chrono::time_point<std::chrono::system_clock> connectedTimestamp;
std::mutex event_lock;
::event* readEvent;
::event* writeEvent;
std::shared_mutex voice_bridge_lock;
std::unique_ptr<web::VoiceBridge> voice_bridge;
int file_descriptor;
struct {
uint8_t current_id = 0;
std::chrono::system_clock::time_point last_request;
std::chrono::system_clock::time_point last_response;
std::mutex event_lock;
::event* readEvent;
::event* writeEvent;
std::chrono::nanoseconds value{};
std::chrono::nanoseconds timeout{2000};
} ping;
struct {
uint8_t current_id = 0;
std::chrono::system_clock::time_point last_request;
std::chrono::system_clock::time_point last_response;
struct {
uint8_t current_id = 0;
std::chrono::system_clock::time_point last_request;
std::chrono::system_clock::time_point last_response;
std::chrono::nanoseconds value{};
std::chrono::nanoseconds timeout{2000};
} ping;
std::chrono::nanoseconds value{};
std::chrono::nanoseconds timeout{2000};
} js_ping;
struct {
uint8_t current_id = 0;
std::chrono::system_clock::time_point last_request;
std::chrono::system_clock::time_point last_response;
std::mutex queue_lock;
std::deque<pipes::buffer> queue_read;
std::deque<pipes::buffer> queue_write;
threads::Mutex execute_lock; /* needs to be recursive! */
std::chrono::nanoseconds value{};
std::chrono::nanoseconds timeout{2000};
} js_ping;
std::thread flush_thread;
std::recursive_mutex close_lock;
private:
void initialize();
std::mutex queue_lock;
std::deque<pipes::buffer> queue_read;
std::deque<pipes::buffer> queue_write;
threads::Mutex execute_lock; /* needs to be recursive! */
bool ssl_detected = false;
bool ssl_encrypted = true;
pipes::SSL ssl_handler;
pipes::WebSocket ws_handler;
void handleMessageRead(int, short, void*);
void handleMessageWrite(int, short, void*);
void enqueue_raw_packet(const pipes::buffer_view& /* buffer */);
std::thread flush_thread;
std::recursive_mutex close_lock;
private:
void initialize();
void processNextMessage(const std::chrono::system_clock::time_point& /* scheduled */);
void registerMessageProcess();
bool allow_raw_commands{false};
bool ssl_detected{false};
bool ssl_encrypted{true};
pipes::SSL ssl_handler;
pipes::WebSocket ws_handler;
void handleMessageRead(int, short, void*);
void handleMessageWrite(int, short, void*);
void enqueue_raw_packet(const pipes::buffer_view& /* buffer */);
std::shared_ptr<event::ProxiedEventEntry<WebClient>> event_handle_packet;
void processNextMessage(const std::chrono::system_clock::time_point& /* scheduled */);
void registerMessageProcess();
//WS events
void onWSConnected();
void onWSDisconnected(const std::string& reason);
void onWSMessage(const pipes::WSMessage&);
protected:
void disconnectFinal();
void handleMessage(const std::string &);
std::shared_ptr<event::ProxiedEventEntry<WebClient>> event_handle_packet;
public:
void send_voice_packet(const pipes::buffer_view &view, const VoicePacketFlags &flags) override;
void send_voice_whisper_packet(const pipes::buffer_view &view, const VoicePacketFlags &flags) override;
//WS events
void onWSConnected();
void onWSDisconnected(const std::string& reason);
void onWSMessage(const pipes::WSMessage&);
protected:
void disconnectFinal();
void handleMessage(const std::string &);
protected:
public:
void send_voice_packet(const pipes::buffer_view &view, const VoicePacketFlags &flags) override;
void send_voice_whisper_packet(const pipes::buffer_view &view, const VoicePacketFlags &flags) override;
command_result handleCommand(Command &command) override;
command_result handleCommandClientInit(Command &command) override;
};
}
protected:
command_result handleCommand(Command &command) override;
command_result handleCommandClientInit(Command &command) override;
};
}
#endif