Implement schema negotiation for the UDP protocol

The MessageClient and MessageServer classes now agree a maximum common
schema number  for the protocol described  in NetworkMessage.hpp. this
is achieved by  the client sending a Heartbeat  message specifying the
highest  schema number  supported, the  server responds  with messages
using  the minimum  of its  highest  supported schema  number and  the
highest  schema number  sent by  the client  in its  initial Heartbeat
message.   This  mechanism  enables  clients and  servers  built  with
different  generations  of the  message  schema  to interoperate  with
minimum loss of functionality.

It should  be noted that messages  may be extended with  new fields on
the  end of  the current  definition  so long  as the  meaning of  the
original fields  are unchanged.  Such  an extension does not  need the
schema number  to be  incremented. On  the other  hand, using  a newer
version  of  the  underlying  Qt  QDataStream::Version  should  always
increment  the schema  number  since  the NetworkMessage::Builder  and
NetworkMessage::Reader classes need to know which QDataStream::Version
to use.

git-svn-id: svn+ssh://svn.code.sf.net/p/wsjt/wsjt/branches/wsjtx@5991 ab8295b8-cf94-4d9e-aec4-7959e3be5d79
This commit is contained in:
Bill Somerville 2015-10-21 18:56:29 +00:00
parent 8479b6c2b6
commit 297e4c8755
4 changed files with 163 additions and 42 deletions

View File

@ -25,6 +25,7 @@ public:
: self_ {self}
, id_ {id}
, server_port_ {server_port}
, schema_ {2} // use 2 prior to negotiation not 1 which is broken
, heartbeat_timer_ {new QTimer {this}}
{
connect (heartbeat_timer_, &QTimer::timeout, this, &impl::heartbeat);
@ -57,6 +58,7 @@ public:
QString server_string_;
port_type server_port_;
QHostAddress server_;
quint32 schema_;
QTimer * heartbeat_timer_;
// hold messages sent before host lookup completes asynchronously
@ -76,6 +78,9 @@ void MessageClient::impl::host_info_results (QHostInfo host_info)
{
server_ = host_info.addresses ()[0];
// send initial heartbeat which allows schema negotiation
heartbeat ();
// clear any backlog
while (pending_messages_.size ())
{
@ -107,9 +112,14 @@ void MessageClient::impl::parse_message (QByteArray const& msg)
// message format is described in NetworkMessage.hpp
//
NetworkMessage::Reader in {msg};
if (OK == check_status (in) && id_ == in.id ()) // OK and for us
{
if (schema_ < in.schema ()) // one time record of server's
// negotiated schema
{
schema_ = in.schema ();
}
//
// message format is described in NetworkMessage.hpp
//
@ -184,7 +194,8 @@ void MessageClient::impl::heartbeat ()
if (server_port_ && !server_.isNull ())
{
QByteArray message;
NetworkMessage::Builder hb {&message, NetworkMessage::Heartbeat, id_};
NetworkMessage::Builder hb {&message, NetworkMessage::Heartbeat, id_, schema_};
hb << NetworkMessage::Builder::schema_number; // maximum schema number accepted
if (OK == check_status (hb))
{
writeDatagram (message, server_, server_port_);
@ -197,7 +208,7 @@ void MessageClient::impl::closedown ()
if (server_port_ && !server_.isNull ())
{
QByteArray message;
NetworkMessage::Builder out {&message, NetworkMessage::Close, id_};
NetworkMessage::Builder out {&message, NetworkMessage::Close, id_, schema_};
if (OK == check_status (out))
{
writeDatagram (message, server_, server_port_);
@ -227,7 +238,6 @@ auto MessageClient::impl::check_status (QDataStream const& stream) const -> Stre
switch (stat)
{
case QDataStream::ReadPastEnd:
qDebug () << __PRETTY_FUNCTION__ << " warning: short UDP message received.";
result = Short;
break;
@ -300,7 +310,7 @@ void MessageClient::status_update (Frequency f, QString const& mode, QString con
if (m_->server_port_ && !m_->server_string_.isEmpty ())
{
QByteArray message;
NetworkMessage::Builder out {&message, NetworkMessage::Status, m_->id_};
NetworkMessage::Builder out {&message, NetworkMessage::Status, m_->id_, m_->schema_};
out << f << mode.toUtf8 () << dx_call.toUtf8 () << report.toUtf8 () << tx_mode.toUtf8 ()
<< tx_enabled << transmitting;
if (impl::OK == m_->check_status (out))
@ -320,7 +330,7 @@ void MessageClient::decode (bool is_new, QTime time, qint32 snr, float delta_tim
if (m_->server_port_ && !m_->server_string_.isEmpty ())
{
QByteArray message;
NetworkMessage::Builder out {&message, NetworkMessage::Decode, m_->id_};
NetworkMessage::Builder out {&message, NetworkMessage::Decode, m_->id_, m_->schema_};
out << is_new << time << snr << delta_time << delta_frequency << mode.toUtf8 () << message_text.toUtf8 ();
if (impl::OK == m_->check_status (out))
{
@ -338,7 +348,7 @@ void MessageClient::clear_decodes ()
if (m_->server_port_ && !m_->server_string_.isEmpty ())
{
QByteArray message;
NetworkMessage::Builder out {&message, NetworkMessage::Clear, m_->id_};
NetworkMessage::Builder out {&message, NetworkMessage::Clear, m_->id_, m_->schema_};
if (impl::OK == m_->check_status (out))
{
m_->send_message (message);
@ -358,7 +368,7 @@ void MessageClient::qso_logged (QDateTime time, QString const& dx_call, QString
if (m_->server_port_ && !m_->server_string_.isEmpty ())
{
QByteArray message;
NetworkMessage::Builder out {&message, NetworkMessage::QSOLogged, m_->id_};
NetworkMessage::Builder out {&message, NetworkMessage::QSOLogged, m_->id_, m_->schema_};
out << time << dx_call.toUtf8 () << dx_grid.toUtf8 () << dial_frequency << mode.toUtf8 ()
<< report_sent.toUtf8 () << report_received.toUtf8 () << tx_power.toUtf8 () << comments.toUtf8 () << name.toUtf8 ();
if (impl::OK == m_->check_status (out))

View File

@ -50,8 +50,20 @@ public:
static BindMode const bind_mode_;
struct Client
{
Client () = default;
Client (QHostAddress const& sender_address, port_type const& sender_port)
: sender_address_ {sender_address}
, sender_port_ {sender_port}
, negotiated_schema_number_ {2} // not 1 because it's broken
, last_activity_ {QDateTime::currentDateTime ()}
{
}
Client (Client const&) = default;
Client& operator= (Client const&) = default;
QHostAddress sender_address_;
port_type sender_port_;
quint32 negotiated_schema_number_;
QDateTime last_activity_;
};
QHash<QString, Client> clients_; // maps id to Client
@ -115,16 +127,37 @@ void MessageServer::impl::parse_message (QHostAddress const& sender, port_type s
auto id = in.id ();
if (OK == check_status (in))
{
bool new_client {false};
if (!clients_.contains (id))
{
new_client = true;
}
clients_[id] = {sender, sender_port, QDateTime::currentDateTime ()};
if (new_client)
{
auto& client = (clients_[id] = {sender, sender_port});
if (NetworkMessage::Heartbeat == in.type ())
{
// negotiate a working schema number
in >> client.negotiated_schema_number_;
if (OK == check_status (in))
{
auto sn = NetworkMessage::Builder::schema_number;
client.negotiated_schema_number_ = std::min (sn, client.negotiated_schema_number_);
// reply to the new client informing it of the
// negotiated schema number
QByteArray message;
NetworkMessage::Builder hb {&message, NetworkMessage::Heartbeat, id, client.negotiated_schema_number_};
hb << NetworkMessage::Builder::schema_number; // maximum schema number accepted
if (impl::OK == check_status (hb))
{
writeDatagram (message, client.sender_address_, client.sender_port_);
}
else
{
Q_EMIT self_->error ("Error creating UDP message");
}
}
}
Q_EMIT self_->client_opened (id);
}
clients_[id].last_activity_ = QDateTime::currentDateTime ();
//
// message format is described in NetworkMessage.hpp
@ -233,13 +266,18 @@ void MessageServer::impl::parse_message (QHostAddress const& sender, port_type s
void MessageServer::impl::tick ()
{
auto now = QDateTime::currentDateTime ();
for (auto iter = std::begin (clients_); iter != std::end (clients_); ++iter)
auto iter = std::begin (clients_);
while (iter != std::end (clients_))
{
if (now > (*iter).last_activity_.addSecs (NetworkMessage::pulse))
{
Q_EMIT self_->clear_decodes (iter.key ());
Q_EMIT self_->client_closed (iter.key ());
clients_.erase (iter); // safe while iterating as doesn't rehash
iter = clients_.erase (iter); // safe while iterating as doesn't rehash
}
else
{
++iter;
}
}
}
@ -251,7 +289,6 @@ auto MessageServer::impl::check_status (QDataStream const& stream) const -> Stre
switch (stat)
{
case QDataStream::ReadPastEnd:
qDebug () << __PRETTY_FUNCTION__ << " warning: short UDP message received.";
result = Short;
break;
@ -307,7 +344,7 @@ void MessageServer::reply (QString const& id, QTime time, qint32 snr, float delt
if (iter != std::end (m_->clients_))
{
QByteArray message;
NetworkMessage::Builder out {&message, NetworkMessage::Reply, id};
NetworkMessage::Builder out {&message, NetworkMessage::Reply, id, (*iter).negotiated_schema_number_};
out << time << snr << delta_time << delta_frequency << mode.toUtf8 () << message_text.toUtf8 ();
if (impl::OK == m_->check_status (out))
{
@ -326,7 +363,7 @@ void MessageServer::replay (QString const& id)
if (iter != std::end (m_->clients_))
{
QByteArray message;
NetworkMessage::Builder out {&message, NetworkMessage::Replay, id};
NetworkMessage::Builder out {&message, NetworkMessage::Replay, id, (*iter).negotiated_schema_number_};
if (impl::OK == m_->check_status (out))
{
m_->writeDatagram (message, iter.value ().sender_address_, (*iter).sender_port_);
@ -344,7 +381,7 @@ void MessageServer::halt_tx (QString const& id, bool auto_only)
if (iter != std::end (m_->clients_))
{
QByteArray message;
NetworkMessage::Builder out {&message, NetworkMessage::HaltTx, id};
NetworkMessage::Builder out {&message, NetworkMessage::HaltTx, id, (*iter).negotiated_schema_number_};
out << auto_only;
if (impl::OK == m_->check_status (out))
{
@ -363,7 +400,7 @@ void MessageServer::free_text (QString const& id, QString const& text, bool send
if (iter != std::end (m_->clients_))
{
QByteArray message;
NetworkMessage::Builder out {&message, NetworkMessage::FreeText, id};
NetworkMessage::Builder out {&message, NetworkMessage::FreeText, id, (*iter).negotiated_schema_number_};
out << text.toUtf8 () << send;
if (impl::OK == m_->check_status (out))
{

View File

@ -9,23 +9,46 @@
namespace NetworkMessage
{
Builder::Builder (QIODevice * device, Type type, QString const& id)
Builder::Builder (QIODevice * device, Type type, QString const& id, quint32 schema)
: QDataStream {device}
{
common_initialization (type, id);
common_initialization (type, id, schema);
}
Builder::Builder (QByteArray * a, Type type, QString const& id)
Builder::Builder (QByteArray * a, Type type, QString const& id, quint32 schema)
: QDataStream {a, QIODevice::WriteOnly}
{
common_initialization (type, id);
common_initialization (type, id, schema);
}
void Builder::common_initialization (Type type, QString const& id)
void Builder::common_initialization (Type type, QString const& id, quint32 schema)
{
if (schema <= 1)
{
setVersion (QDataStream::Qt_5_0); // Qt schema version
}
#if QT_VERSION >= 0x050200
else if (schema <= 2)
{
setVersion (QDataStream::Qt_5_2); // Qt schema version
}
#endif
#if QT_VERSION >= 0x050400
else if (schema <= 3)
{
setVersion (QDataStream::Qt_5_4); // Qt schema version
}
#endif
else
{
throw std::runtime_error {"Unrecognized message schema"};
}
// the following two items assume that the quint32 encoding is
// unchanged over QDataStream versions
*this << magic;
*this << schema_number;
setVersion (QDataStream::Qt_5_2); // Qt schema version
*this << schema;
*this << static_cast<quint32> (type) << id.toUtf8 ();
}
@ -49,10 +72,18 @@ namespace NetworkMessage
{
parent->setVersion (QDataStream::Qt_5_0);
}
#if QT_VERSION >= 0x050200
else if (schema_ <= 2)
{
parent->setVersion (QDataStream::Qt_5_2);
}
#endif
#if QT_VERSION >= 0x050400
else if (schema_ <= 3)
{
parent->setVersion (QDataStream::Qt_5_4);
}
#endif
quint32 type;
*parent >> type >> id_;
if (type >= maximum_message_type_)

View File

@ -26,7 +26,7 @@
*
* for the serialization details for each type, at the time of
* writing the above document is for Qt_5_0 format which is buggy
* so we use Qt_5_2 format, differences are:
* so we use Qt_5_4 format, differences are:
*
* QDateTime:
* QDate qint64 Julian day number
@ -49,19 +49,54 @@
* strings and null strings. Empty strings have a length of zero
* whereas null strings have a length field of 0xffffffff.
*
* Schema Version 1:
* -----------------
* Schema Negotiation
* ------------------
*
* The NetworkMessage::Builder class specifies a schema number which
* may be incremented from time to time. It represents a version of
* the underlying encoding schemes used to store data items. Since the
* underlying encoding is defined by the Qt project in it's
* QDataStream stream operators, it is essential that clients and
* servers of this protocol can agree on a common scheme. The
* NetworkMessage utility classes below exchange the schema number
* actually used. The handling of the schema is backwards compatible
* to an extent, so long as clients and servers are written
* correctly. For example a server written to any particular schema
* version can communicate with a client written to a later schema.
*
* Schema Version 1:- this schema used the QDataStream::Qt_5_0 version
* which is broken.
*
* Schema Version 2:- this schema uses the QDataStream::Qt_5_2 version.
*
* Schema Version 3:- this schema uses the QDataStream::Qt_5_4 version.
*
*
*
* Message Direction Value Type
* ------------- --------- ---------------------- -----------
* Heartbeat Out 0 quint32
* Heartbeat Out/In 0 quint32
* Id (unique key) utf8
* Maximum schema number quint32
*
* The heartbeat message is sent on a periodic basis every
* NetworkMessage::pulse seconds (see below). This message is
* intended to be used by server to detect the presence of a client
* and also the unexpected disappearance of a client. The
* message_aggregator reference server does just that.
* The heartbeat message shall be sent on a periodic basis every
* NetworkMessage::pulse seconds (see below), the WSJT-X
* application does that using the MessageClient class. This
* message is intended to be used by servers to detect the presence
* of a client and also the unexpected disappearance of a client
* and by clients to learn the schema negotiated by the server
* after it receives the initial heartbeat message from a client.
* The message_aggregator reference server does just that using the
* MessageServer class. Upon initial startup a client must send a
* heartbeat message as soon as is practical, this message is used
* to negotiate the maximum schema number common to the client and
* server. Note that the server may not be able to support the
* client's requested maximum schema number, in which case the
* first message received from the server will specify a lower
* schema number (never a higher one as that is not allowed). If a
* server replies with a lower schema number then no higher than
* that number shall be used for all further outgoing messages from
* either clients or the server itself.
*
*
* Status Out 1 quint32
@ -222,6 +257,7 @@
* "Send" flag is unset. Note that this API does not include a
* command to determine the contents of the current free text
* message.
*
*/
#include <QDataStream>
@ -254,7 +290,7 @@ namespace NetworkMessage
quint32 constexpr pulse {15}; // seconds
//
// NetworkMessage::Build - build a message containing serialized Qt types
// NetworkMessage::Builder - build a message containing serialized Qt types
//
class Builder
: public QDataStream
@ -263,16 +299,23 @@ namespace NetworkMessage
static quint32 constexpr magic {0xadbccbda}; // never change this
// increment this if a newer Qt schema is required and add decode
// logic to InputMessageStream below
// logic to the Builder and Reader class implementations
#if QT_VERSION >= 0x050400
static quint32 constexpr schema_number {3};
#elif QT_VERSION >= 0x050200
static quint32 constexpr schema_number {2};
#else
// Schema 1 (Qt_5_0) is broken
#error "Qt version 5.2 or greater required"
#endif
explicit Builder (QIODevice *, Type, QString const& id);
explicit Builder (QByteArray *, Type, QString const& id);
explicit Builder (QIODevice *, Type, QString const& id, quint32 schema);
explicit Builder (QByteArray *, Type, QString const& id, quint32 schema);
Builder (Builder const&) = delete;
Builder& operator = (Builder const&) = delete;
private:
void common_initialization (Type type, QString const& id);
void common_initialization (Type type, QString const& id, quint32 schema);
};
//