From 297e4c8755a971fc643c958b9cc6989188b0cf29 Mon Sep 17 00:00:00 2001 From: Bill Somerville Date: Wed, 21 Oct 2015 18:56:29 +0000 Subject: [PATCH] Implement schema negotiation for the UDP protocol The MessageClient and MessageServer classes now agree a maximum common schema number for the protocol described in NetworkMessage.hpp. this is achieved by the client sending a Heartbeat message specifying the highest schema number supported, the server responds with messages using the minimum of its highest supported schema number and the highest schema number sent by the client in its initial Heartbeat message. This mechanism enables clients and servers built with different generations of the message schema to interoperate with minimum loss of functionality. It should be noted that messages may be extended with new fields on the end of the current definition so long as the meaning of the original fields are unchanged. Such an extension does not need the schema number to be incremented. On the other hand, using a newer version of the underlying Qt QDataStream::Version should always increment the schema number since the NetworkMessage::Builder and NetworkMessage::Reader classes need to know which QDataStream::Version to use. git-svn-id: svn+ssh://svn.code.sf.net/p/wsjt/wsjt/branches/wsjtx@5991 ab8295b8-cf94-4d9e-aec4-7959e3be5d79 --- MessageClient.cpp | 26 +++++++++++------ MessageServer.cpp | 63 +++++++++++++++++++++++++++++++--------- NetworkMessage.cpp | 45 ++++++++++++++++++++++++----- NetworkMessage.hpp | 71 +++++++++++++++++++++++++++++++++++++--------- 4 files changed, 163 insertions(+), 42 deletions(-) diff --git a/MessageClient.cpp b/MessageClient.cpp index 15012316f..47e60c45a 100644 --- a/MessageClient.cpp +++ b/MessageClient.cpp @@ -25,6 +25,7 @@ public: : self_ {self} , id_ {id} , server_port_ {server_port} + , schema_ {2} // use 2 prior to negotiation not 1 which is broken , heartbeat_timer_ {new QTimer {this}} { connect (heartbeat_timer_, &QTimer::timeout, this, &impl::heartbeat); @@ -57,6 +58,7 @@ public: QString server_string_; port_type server_port_; QHostAddress server_; + quint32 schema_; QTimer * heartbeat_timer_; // hold messages sent before host lookup completes asynchronously @@ -76,6 +78,9 @@ void MessageClient::impl::host_info_results (QHostInfo host_info) { server_ = host_info.addresses ()[0]; + // send initial heartbeat which allows schema negotiation + heartbeat (); + // clear any backlog while (pending_messages_.size ()) { @@ -107,9 +112,14 @@ void MessageClient::impl::parse_message (QByteArray const& msg) // message format is described in NetworkMessage.hpp // NetworkMessage::Reader in {msg}; - if (OK == check_status (in) && id_ == in.id ()) // OK and for us { + if (schema_ < in.schema ()) // one time record of server's + // negotiated schema + { + schema_ = in.schema (); + } + // // message format is described in NetworkMessage.hpp // @@ -184,7 +194,8 @@ void MessageClient::impl::heartbeat () if (server_port_ && !server_.isNull ()) { QByteArray message; - NetworkMessage::Builder hb {&message, NetworkMessage::Heartbeat, id_}; + NetworkMessage::Builder hb {&message, NetworkMessage::Heartbeat, id_, schema_}; + hb << NetworkMessage::Builder::schema_number; // maximum schema number accepted if (OK == check_status (hb)) { writeDatagram (message, server_, server_port_); @@ -197,7 +208,7 @@ void MessageClient::impl::closedown () if (server_port_ && !server_.isNull ()) { QByteArray message; - NetworkMessage::Builder out {&message, NetworkMessage::Close, id_}; + NetworkMessage::Builder out {&message, NetworkMessage::Close, id_, schema_}; if (OK == check_status (out)) { writeDatagram (message, server_, server_port_); @@ -227,7 +238,6 @@ auto MessageClient::impl::check_status (QDataStream const& stream) const -> Stre switch (stat) { case QDataStream::ReadPastEnd: - qDebug () << __PRETTY_FUNCTION__ << " warning: short UDP message received."; result = Short; break; @@ -300,7 +310,7 @@ void MessageClient::status_update (Frequency f, QString const& mode, QString con if (m_->server_port_ && !m_->server_string_.isEmpty ()) { QByteArray message; - NetworkMessage::Builder out {&message, NetworkMessage::Status, m_->id_}; + NetworkMessage::Builder out {&message, NetworkMessage::Status, m_->id_, m_->schema_}; out << f << mode.toUtf8 () << dx_call.toUtf8 () << report.toUtf8 () << tx_mode.toUtf8 () << tx_enabled << transmitting; if (impl::OK == m_->check_status (out)) @@ -320,7 +330,7 @@ void MessageClient::decode (bool is_new, QTime time, qint32 snr, float delta_tim if (m_->server_port_ && !m_->server_string_.isEmpty ()) { QByteArray message; - NetworkMessage::Builder out {&message, NetworkMessage::Decode, m_->id_}; + NetworkMessage::Builder out {&message, NetworkMessage::Decode, m_->id_, m_->schema_}; out << is_new << time << snr << delta_time << delta_frequency << mode.toUtf8 () << message_text.toUtf8 (); if (impl::OK == m_->check_status (out)) { @@ -338,7 +348,7 @@ void MessageClient::clear_decodes () if (m_->server_port_ && !m_->server_string_.isEmpty ()) { QByteArray message; - NetworkMessage::Builder out {&message, NetworkMessage::Clear, m_->id_}; + NetworkMessage::Builder out {&message, NetworkMessage::Clear, m_->id_, m_->schema_}; if (impl::OK == m_->check_status (out)) { m_->send_message (message); @@ -358,7 +368,7 @@ void MessageClient::qso_logged (QDateTime time, QString const& dx_call, QString if (m_->server_port_ && !m_->server_string_.isEmpty ()) { QByteArray message; - NetworkMessage::Builder out {&message, NetworkMessage::QSOLogged, m_->id_}; + NetworkMessage::Builder out {&message, NetworkMessage::QSOLogged, m_->id_, m_->schema_}; out << time << dx_call.toUtf8 () << dx_grid.toUtf8 () << dial_frequency << mode.toUtf8 () << report_sent.toUtf8 () << report_received.toUtf8 () << tx_power.toUtf8 () << comments.toUtf8 () << name.toUtf8 (); if (impl::OK == m_->check_status (out)) diff --git a/MessageServer.cpp b/MessageServer.cpp index 27f66af9a..0a97287fa 100644 --- a/MessageServer.cpp +++ b/MessageServer.cpp @@ -50,8 +50,20 @@ public: static BindMode const bind_mode_; struct Client { + Client () = default; + Client (QHostAddress const& sender_address, port_type const& sender_port) + : sender_address_ {sender_address} + , sender_port_ {sender_port} + , negotiated_schema_number_ {2} // not 1 because it's broken + , last_activity_ {QDateTime::currentDateTime ()} + { + } + Client (Client const&) = default; + Client& operator= (Client const&) = default; + QHostAddress sender_address_; port_type sender_port_; + quint32 negotiated_schema_number_; QDateTime last_activity_; }; QHash clients_; // maps id to Client @@ -115,16 +127,37 @@ void MessageServer::impl::parse_message (QHostAddress const& sender, port_type s auto id = in.id (); if (OK == check_status (in)) { - bool new_client {false}; if (!clients_.contains (id)) { - new_client = true; - } - clients_[id] = {sender, sender_port, QDateTime::currentDateTime ()}; - if (new_client) - { + auto& client = (clients_[id] = {sender, sender_port}); + + if (NetworkMessage::Heartbeat == in.type ()) + { + // negotiate a working schema number + in >> client.negotiated_schema_number_; + if (OK == check_status (in)) + { + auto sn = NetworkMessage::Builder::schema_number; + client.negotiated_schema_number_ = std::min (sn, client.negotiated_schema_number_); + + // reply to the new client informing it of the + // negotiated schema number + QByteArray message; + NetworkMessage::Builder hb {&message, NetworkMessage::Heartbeat, id, client.negotiated_schema_number_}; + hb << NetworkMessage::Builder::schema_number; // maximum schema number accepted + if (impl::OK == check_status (hb)) + { + writeDatagram (message, client.sender_address_, client.sender_port_); + } + else + { + Q_EMIT self_->error ("Error creating UDP message"); + } + } + } Q_EMIT self_->client_opened (id); } + clients_[id].last_activity_ = QDateTime::currentDateTime (); // // message format is described in NetworkMessage.hpp @@ -233,13 +266,18 @@ void MessageServer::impl::parse_message (QHostAddress const& sender, port_type s void MessageServer::impl::tick () { auto now = QDateTime::currentDateTime (); - for (auto iter = std::begin (clients_); iter != std::end (clients_); ++iter) + auto iter = std::begin (clients_); + while (iter != std::end (clients_)) { if (now > (*iter).last_activity_.addSecs (NetworkMessage::pulse)) { Q_EMIT self_->clear_decodes (iter.key ()); Q_EMIT self_->client_closed (iter.key ()); - clients_.erase (iter); // safe while iterating as doesn't rehash + iter = clients_.erase (iter); // safe while iterating as doesn't rehash + } + else + { + ++iter; } } } @@ -251,7 +289,6 @@ auto MessageServer::impl::check_status (QDataStream const& stream) const -> Stre switch (stat) { case QDataStream::ReadPastEnd: - qDebug () << __PRETTY_FUNCTION__ << " warning: short UDP message received."; result = Short; break; @@ -307,7 +344,7 @@ void MessageServer::reply (QString const& id, QTime time, qint32 snr, float delt if (iter != std::end (m_->clients_)) { QByteArray message; - NetworkMessage::Builder out {&message, NetworkMessage::Reply, id}; + NetworkMessage::Builder out {&message, NetworkMessage::Reply, id, (*iter).negotiated_schema_number_}; out << time << snr << delta_time << delta_frequency << mode.toUtf8 () << message_text.toUtf8 (); if (impl::OK == m_->check_status (out)) { @@ -326,7 +363,7 @@ void MessageServer::replay (QString const& id) if (iter != std::end (m_->clients_)) { QByteArray message; - NetworkMessage::Builder out {&message, NetworkMessage::Replay, id}; + NetworkMessage::Builder out {&message, NetworkMessage::Replay, id, (*iter).negotiated_schema_number_}; if (impl::OK == m_->check_status (out)) { m_->writeDatagram (message, iter.value ().sender_address_, (*iter).sender_port_); @@ -344,7 +381,7 @@ void MessageServer::halt_tx (QString const& id, bool auto_only) if (iter != std::end (m_->clients_)) { QByteArray message; - NetworkMessage::Builder out {&message, NetworkMessage::HaltTx, id}; + NetworkMessage::Builder out {&message, NetworkMessage::HaltTx, id, (*iter).negotiated_schema_number_}; out << auto_only; if (impl::OK == m_->check_status (out)) { @@ -363,7 +400,7 @@ void MessageServer::free_text (QString const& id, QString const& text, bool send if (iter != std::end (m_->clients_)) { QByteArray message; - NetworkMessage::Builder out {&message, NetworkMessage::FreeText, id}; + NetworkMessage::Builder out {&message, NetworkMessage::FreeText, id, (*iter).negotiated_schema_number_}; out << text.toUtf8 () << send; if (impl::OK == m_->check_status (out)) { diff --git a/NetworkMessage.cpp b/NetworkMessage.cpp index 5c7e0f1a0..b1ad0f11b 100644 --- a/NetworkMessage.cpp +++ b/NetworkMessage.cpp @@ -9,23 +9,46 @@ namespace NetworkMessage { - Builder::Builder (QIODevice * device, Type type, QString const& id) + Builder::Builder (QIODevice * device, Type type, QString const& id, quint32 schema) : QDataStream {device} { - common_initialization (type, id); + common_initialization (type, id, schema); } - Builder::Builder (QByteArray * a, Type type, QString const& id) + Builder::Builder (QByteArray * a, Type type, QString const& id, quint32 schema) : QDataStream {a, QIODevice::WriteOnly} { - common_initialization (type, id); + common_initialization (type, id, schema); } - void Builder::common_initialization (Type type, QString const& id) + void Builder::common_initialization (Type type, QString const& id, quint32 schema) { + if (schema <= 1) + { + setVersion (QDataStream::Qt_5_0); // Qt schema version + } +#if QT_VERSION >= 0x050200 + else if (schema <= 2) + { + setVersion (QDataStream::Qt_5_2); // Qt schema version + } +#endif +#if QT_VERSION >= 0x050400 + else if (schema <= 3) + { + setVersion (QDataStream::Qt_5_4); // Qt schema version + } +#endif + else + { + throw std::runtime_error {"Unrecognized message schema"}; + } + + // the following two items assume that the quint32 encoding is + // unchanged over QDataStream versions *this << magic; - *this << schema_number; - setVersion (QDataStream::Qt_5_2); // Qt schema version + *this << schema; + *this << static_cast (type) << id.toUtf8 (); } @@ -49,10 +72,18 @@ namespace NetworkMessage { parent->setVersion (QDataStream::Qt_5_0); } +#if QT_VERSION >= 0x050200 else if (schema_ <= 2) { parent->setVersion (QDataStream::Qt_5_2); } +#endif +#if QT_VERSION >= 0x050400 + else if (schema_ <= 3) + { + parent->setVersion (QDataStream::Qt_5_4); + } +#endif quint32 type; *parent >> type >> id_; if (type >= maximum_message_type_) diff --git a/NetworkMessage.hpp b/NetworkMessage.hpp index de18461d0..90bddb6e0 100644 --- a/NetworkMessage.hpp +++ b/NetworkMessage.hpp @@ -26,7 +26,7 @@ * * for the serialization details for each type, at the time of * writing the above document is for Qt_5_0 format which is buggy - * so we use Qt_5_2 format, differences are: + * so we use Qt_5_4 format, differences are: * * QDateTime: * QDate qint64 Julian day number @@ -49,19 +49,54 @@ * strings and null strings. Empty strings have a length of zero * whereas null strings have a length field of 0xffffffff. * - * Schema Version 1: - * ----------------- + * Schema Negotiation + * ------------------ + * + * The NetworkMessage::Builder class specifies a schema number which + * may be incremented from time to time. It represents a version of + * the underlying encoding schemes used to store data items. Since the + * underlying encoding is defined by the Qt project in it's + * QDataStream stream operators, it is essential that clients and + * servers of this protocol can agree on a common scheme. The + * NetworkMessage utility classes below exchange the schema number + * actually used. The handling of the schema is backwards compatible + * to an extent, so long as clients and servers are written + * correctly. For example a server written to any particular schema + * version can communicate with a client written to a later schema. + * + * Schema Version 1:- this schema used the QDataStream::Qt_5_0 version + * which is broken. + * + * Schema Version 2:- this schema uses the QDataStream::Qt_5_2 version. + * + * Schema Version 3:- this schema uses the QDataStream::Qt_5_4 version. + * + * * * Message Direction Value Type * ------------- --------- ---------------------- ----------- - * Heartbeat Out 0 quint32 + * Heartbeat Out/In 0 quint32 * Id (unique key) utf8 + * Maximum schema number quint32 * - * The heartbeat message is sent on a periodic basis every - * NetworkMessage::pulse seconds (see below). This message is - * intended to be used by server to detect the presence of a client - * and also the unexpected disappearance of a client. The - * message_aggregator reference server does just that. + * The heartbeat message shall be sent on a periodic basis every + * NetworkMessage::pulse seconds (see below), the WSJT-X + * application does that using the MessageClient class. This + * message is intended to be used by servers to detect the presence + * of a client and also the unexpected disappearance of a client + * and by clients to learn the schema negotiated by the server + * after it receives the initial heartbeat message from a client. + * The message_aggregator reference server does just that using the + * MessageServer class. Upon initial startup a client must send a + * heartbeat message as soon as is practical, this message is used + * to negotiate the maximum schema number common to the client and + * server. Note that the server may not be able to support the + * client's requested maximum schema number, in which case the + * first message received from the server will specify a lower + * schema number (never a higher one as that is not allowed). If a + * server replies with a lower schema number then no higher than + * that number shall be used for all further outgoing messages from + * either clients or the server itself. * * * Status Out 1 quint32 @@ -222,6 +257,7 @@ * "Send" flag is unset. Note that this API does not include a * command to determine the contents of the current free text * message. + * */ #include @@ -254,7 +290,7 @@ namespace NetworkMessage quint32 constexpr pulse {15}; // seconds // - // NetworkMessage::Build - build a message containing serialized Qt types + // NetworkMessage::Builder - build a message containing serialized Qt types // class Builder : public QDataStream @@ -263,16 +299,23 @@ namespace NetworkMessage static quint32 constexpr magic {0xadbccbda}; // never change this // increment this if a newer Qt schema is required and add decode - // logic to InputMessageStream below + // logic to the Builder and Reader class implementations +#if QT_VERSION >= 0x050400 + static quint32 constexpr schema_number {3}; +#elif QT_VERSION >= 0x050200 static quint32 constexpr schema_number {2}; +#else + // Schema 1 (Qt_5_0) is broken +#error "Qt version 5.2 or greater required" +#endif - explicit Builder (QIODevice *, Type, QString const& id); - explicit Builder (QByteArray *, Type, QString const& id); + explicit Builder (QIODevice *, Type, QString const& id, quint32 schema); + explicit Builder (QByteArray *, Type, QString const& id, quint32 schema); Builder (Builder const&) = delete; Builder& operator = (Builder const&) = delete; private: - void common_initialization (Type type, QString const& id); + void common_initialization (Type type, QString const& id, quint32 schema); }; //