mirror of
				https://github.com/saitohirga/WSJT-X.git
				synced 2025-10-26 10:30:22 -04:00 
			
		
		
		
	Implement schema negotiation for the UDP protocol
The MessageClient and MessageServer classes now agree a maximum common schema number for the protocol described in NetworkMessage.hpp. this is achieved by the client sending a Heartbeat message specifying the highest schema number supported, the server responds with messages using the minimum of its highest supported schema number and the highest schema number sent by the client in its initial Heartbeat message. This mechanism enables clients and servers built with different generations of the message schema to interoperate with minimum loss of functionality. It should be noted that messages may be extended with new fields on the end of the current definition so long as the meaning of the original fields are unchanged. Such an extension does not need the schema number to be incremented. On the other hand, using a newer version of the underlying Qt QDataStream::Version should always increment the schema number since the NetworkMessage::Builder and NetworkMessage::Reader classes need to know which QDataStream::Version to use. git-svn-id: svn+ssh://svn.code.sf.net/p/wsjt/wsjt/branches/wsjtx@5991 ab8295b8-cf94-4d9e-aec4-7959e3be5d79
This commit is contained in:
		
							parent
							
								
									d97e804347
								
							
						
					
					
						commit
						b74788ea89
					
				| @ -25,6 +25,7 @@ public: | ||||
|     : self_ {self} | ||||
|     , id_ {id} | ||||
|     , server_port_ {server_port} | ||||
|     , schema_ {2}  // use 2 prior to negotiation not 1 which is broken
 | ||||
|     , heartbeat_timer_ {new QTimer {this}} | ||||
|   { | ||||
|     connect (heartbeat_timer_, &QTimer::timeout, this, &impl::heartbeat); | ||||
| @ -57,6 +58,7 @@ public: | ||||
|   QString server_string_; | ||||
|   port_type server_port_; | ||||
|   QHostAddress server_; | ||||
|   quint32 schema_; | ||||
|   QTimer * heartbeat_timer_; | ||||
| 
 | ||||
|   // hold messages sent before host lookup completes asynchronously
 | ||||
| @ -76,6 +78,9 @@ void MessageClient::impl::host_info_results (QHostInfo host_info) | ||||
|     { | ||||
|       server_ = host_info.addresses ()[0]; | ||||
| 
 | ||||
|       // send initial heartbeat which allows schema negotiation
 | ||||
|       heartbeat (); | ||||
| 
 | ||||
|       // clear any backlog
 | ||||
|       while (pending_messages_.size ()) | ||||
|         { | ||||
| @ -107,9 +112,14 @@ void MessageClient::impl::parse_message (QByteArray const& msg) | ||||
|       // message format is described in NetworkMessage.hpp
 | ||||
|       // 
 | ||||
|       NetworkMessage::Reader in {msg}; | ||||
| 
 | ||||
|       if (OK == check_status (in) && id_ == in.id ()) // OK and for us
 | ||||
|         { | ||||
|           if (schema_ < in.schema ()) // one time record of server's
 | ||||
|                                       // negotiated schema
 | ||||
|             { | ||||
|               schema_ = in.schema (); | ||||
|             } | ||||
| 
 | ||||
|           //
 | ||||
|           // message format is described in NetworkMessage.hpp
 | ||||
|           //
 | ||||
| @ -184,7 +194,8 @@ void MessageClient::impl::heartbeat () | ||||
|    if (server_port_ && !server_.isNull ()) | ||||
|     { | ||||
|       QByteArray message; | ||||
|       NetworkMessage::Builder hb {&message, NetworkMessage::Heartbeat, id_}; | ||||
|       NetworkMessage::Builder hb {&message, NetworkMessage::Heartbeat, id_, schema_}; | ||||
|       hb << NetworkMessage::Builder::schema_number; // maximum schema number accepted
 | ||||
|       if (OK == check_status (hb)) | ||||
|         { | ||||
|           writeDatagram (message, server_, server_port_); | ||||
| @ -197,7 +208,7 @@ void MessageClient::impl::closedown () | ||||
|    if (server_port_ && !server_.isNull ()) | ||||
|     { | ||||
|       QByteArray message; | ||||
|       NetworkMessage::Builder out {&message, NetworkMessage::Close, id_}; | ||||
|       NetworkMessage::Builder out {&message, NetworkMessage::Close, id_, schema_}; | ||||
|       if (OK == check_status (out)) | ||||
|         { | ||||
|           writeDatagram (message, server_, server_port_); | ||||
| @ -227,7 +238,6 @@ auto MessageClient::impl::check_status (QDataStream const& stream) const -> Stre | ||||
|   switch (stat) | ||||
|     { | ||||
|     case QDataStream::ReadPastEnd: | ||||
|       qDebug () << __PRETTY_FUNCTION__ << " warning: short UDP message received."; | ||||
|       result = Short; | ||||
|       break; | ||||
| 
 | ||||
| @ -300,7 +310,7 @@ void MessageClient::status_update (Frequency f, QString const& mode, QString con | ||||
|   if (m_->server_port_ && !m_->server_string_.isEmpty ()) | ||||
|     { | ||||
|       QByteArray message; | ||||
|       NetworkMessage::Builder out {&message, NetworkMessage::Status, m_->id_}; | ||||
|       NetworkMessage::Builder out {&message, NetworkMessage::Status, m_->id_, m_->schema_}; | ||||
|       out << f << mode.toUtf8 () << dx_call.toUtf8 () << report.toUtf8 () << tx_mode.toUtf8 () | ||||
|           << tx_enabled << transmitting; | ||||
|       if (impl::OK == m_->check_status (out)) | ||||
| @ -320,7 +330,7 @@ void MessageClient::decode (bool is_new, QTime time, qint32 snr, float delta_tim | ||||
|    if (m_->server_port_ && !m_->server_string_.isEmpty ()) | ||||
|     { | ||||
|       QByteArray message; | ||||
|       NetworkMessage::Builder out {&message, NetworkMessage::Decode, m_->id_}; | ||||
|       NetworkMessage::Builder out {&message, NetworkMessage::Decode, m_->id_, m_->schema_}; | ||||
|       out << is_new << time << snr << delta_time << delta_frequency << mode.toUtf8 () << message_text.toUtf8 (); | ||||
|       if (impl::OK == m_->check_status (out)) | ||||
|         { | ||||
| @ -338,7 +348,7 @@ void MessageClient::clear_decodes () | ||||
|    if (m_->server_port_ && !m_->server_string_.isEmpty ()) | ||||
|     { | ||||
|       QByteArray message; | ||||
|       NetworkMessage::Builder out {&message, NetworkMessage::Clear, m_->id_}; | ||||
|       NetworkMessage::Builder out {&message, NetworkMessage::Clear, m_->id_, m_->schema_}; | ||||
|       if (impl::OK == m_->check_status (out)) | ||||
|         { | ||||
|           m_->send_message (message); | ||||
| @ -358,7 +368,7 @@ void MessageClient::qso_logged (QDateTime time, QString const& dx_call, QString | ||||
|    if (m_->server_port_ && !m_->server_string_.isEmpty ()) | ||||
|     { | ||||
|       QByteArray message; | ||||
|       NetworkMessage::Builder out {&message, NetworkMessage::QSOLogged, m_->id_}; | ||||
|       NetworkMessage::Builder out {&message, NetworkMessage::QSOLogged, m_->id_, m_->schema_}; | ||||
|       out << time << dx_call.toUtf8 () << dx_grid.toUtf8 () << dial_frequency << mode.toUtf8 () | ||||
|           << report_sent.toUtf8 () << report_received.toUtf8 () << tx_power.toUtf8 () << comments.toUtf8 () << name.toUtf8 (); | ||||
|       if (impl::OK == m_->check_status (out)) | ||||
|  | ||||
| @ -50,8 +50,20 @@ public: | ||||
|   static BindMode const bind_mode_; | ||||
|   struct Client | ||||
|   { | ||||
|     Client () = default; | ||||
|     Client (QHostAddress const& sender_address, port_type const& sender_port) | ||||
|       : sender_address_ {sender_address} | ||||
|       , sender_port_ {sender_port} | ||||
|       , negotiated_schema_number_ {2} // not 1 because it's broken
 | ||||
|       , last_activity_ {QDateTime::currentDateTime ()} | ||||
|     { | ||||
|     } | ||||
|     Client (Client const&) = default; | ||||
|     Client& operator= (Client const&) = default; | ||||
| 
 | ||||
|     QHostAddress sender_address_; | ||||
|     port_type sender_port_; | ||||
|     quint32 negotiated_schema_number_; | ||||
|     QDateTime last_activity_; | ||||
|   }; | ||||
|   QHash<QString, Client> clients_; // maps id to Client
 | ||||
| @ -115,16 +127,37 @@ void MessageServer::impl::parse_message (QHostAddress const& sender, port_type s | ||||
|       auto id = in.id (); | ||||
|       if (OK == check_status (in)) | ||||
|         { | ||||
|           bool new_client {false}; | ||||
|           if (!clients_.contains (id)) | ||||
|             { | ||||
|               new_client = true; | ||||
|             } | ||||
|           clients_[id] = {sender, sender_port, QDateTime::currentDateTime ()}; | ||||
|           if (new_client) | ||||
|               auto& client = (clients_[id] = {sender, sender_port}); | ||||
| 
 | ||||
|               if (NetworkMessage::Heartbeat == in.type ()) | ||||
|                 { | ||||
|                   // negotiate a working schema number
 | ||||
|                   in >> client.negotiated_schema_number_; | ||||
|                   if (OK == check_status (in)) | ||||
|                     { | ||||
|                       auto sn = NetworkMessage::Builder::schema_number; | ||||
|                       client.negotiated_schema_number_ = std::min (sn, client.negotiated_schema_number_); | ||||
| 
 | ||||
|                       // reply to the new client informing it of the
 | ||||
|                       // negotiated schema number
 | ||||
|                       QByteArray message; | ||||
|                       NetworkMessage::Builder hb {&message, NetworkMessage::Heartbeat, id, client.negotiated_schema_number_}; | ||||
|                       hb << NetworkMessage::Builder::schema_number; // maximum schema number accepted
 | ||||
|                       if (impl::OK == check_status (hb)) | ||||
|                         { | ||||
|                           writeDatagram (message, client.sender_address_, client.sender_port_); | ||||
|                         } | ||||
|                       else | ||||
|                         { | ||||
|                           Q_EMIT self_->error ("Error creating UDP message"); | ||||
|                         } | ||||
|                     } | ||||
|                 } | ||||
|               Q_EMIT self_->client_opened (id); | ||||
|             } | ||||
|           clients_[id].last_activity_ = QDateTime::currentDateTime (); | ||||
|    | ||||
|           //
 | ||||
|           // message format is described in NetworkMessage.hpp
 | ||||
| @ -233,13 +266,18 @@ void MessageServer::impl::parse_message (QHostAddress const& sender, port_type s | ||||
| void MessageServer::impl::tick () | ||||
| { | ||||
|   auto now = QDateTime::currentDateTime (); | ||||
|   for (auto iter = std::begin (clients_); iter != std::end (clients_); ++iter) | ||||
|   auto iter = std::begin (clients_); | ||||
|   while (iter != std::end (clients_)) | ||||
|     { | ||||
|       if (now > (*iter).last_activity_.addSecs (NetworkMessage::pulse)) | ||||
|         { | ||||
|           Q_EMIT self_->clear_decodes (iter.key ()); | ||||
|           Q_EMIT self_->client_closed (iter.key ()); | ||||
|           clients_.erase (iter); // safe while iterating as doesn't rehash
 | ||||
|           iter = clients_.erase (iter); // safe while iterating as doesn't rehash
 | ||||
|         } | ||||
|       else | ||||
|         { | ||||
|           ++iter; | ||||
|         } | ||||
|     } | ||||
| } | ||||
| @ -251,7 +289,6 @@ auto MessageServer::impl::check_status (QDataStream const& stream) const -> Stre | ||||
|   switch (stat) | ||||
|     { | ||||
|     case QDataStream::ReadPastEnd: | ||||
|       qDebug () << __PRETTY_FUNCTION__ << " warning: short UDP message received."; | ||||
|       result = Short; | ||||
|       break; | ||||
| 
 | ||||
| @ -307,7 +344,7 @@ void MessageServer::reply (QString const& id, QTime time, qint32 snr, float delt | ||||
|   if (iter != std::end (m_->clients_)) | ||||
|     { | ||||
|       QByteArray message; | ||||
|       NetworkMessage::Builder out {&message, NetworkMessage::Reply, id}; | ||||
|       NetworkMessage::Builder out {&message, NetworkMessage::Reply, id, (*iter).negotiated_schema_number_}; | ||||
|       out << time << snr << delta_time << delta_frequency << mode.toUtf8 () << message_text.toUtf8 (); | ||||
|       if (impl::OK == m_->check_status (out)) | ||||
|         { | ||||
| @ -326,7 +363,7 @@ void MessageServer::replay (QString const& id) | ||||
|   if (iter != std::end (m_->clients_)) | ||||
|     { | ||||
|       QByteArray message; | ||||
|       NetworkMessage::Builder out {&message, NetworkMessage::Replay, id}; | ||||
|       NetworkMessage::Builder out {&message, NetworkMessage::Replay, id, (*iter).negotiated_schema_number_}; | ||||
|       if (impl::OK == m_->check_status (out)) | ||||
|         { | ||||
|           m_->writeDatagram (message, iter.value ().sender_address_, (*iter).sender_port_); | ||||
| @ -344,7 +381,7 @@ void MessageServer::halt_tx (QString const& id, bool auto_only) | ||||
|   if (iter != std::end (m_->clients_)) | ||||
|     { | ||||
|       QByteArray message; | ||||
|       NetworkMessage::Builder out {&message, NetworkMessage::HaltTx, id}; | ||||
|       NetworkMessage::Builder out {&message, NetworkMessage::HaltTx, id, (*iter).negotiated_schema_number_}; | ||||
|       out << auto_only; | ||||
|       if (impl::OK == m_->check_status (out)) | ||||
|         { | ||||
| @ -363,7 +400,7 @@ void MessageServer::free_text (QString const& id, QString const& text, bool send | ||||
|   if (iter != std::end (m_->clients_)) | ||||
|     { | ||||
|       QByteArray message; | ||||
|       NetworkMessage::Builder out {&message, NetworkMessage::FreeText, id}; | ||||
|       NetworkMessage::Builder out {&message, NetworkMessage::FreeText, id, (*iter).negotiated_schema_number_}; | ||||
|       out << text.toUtf8 () << send; | ||||
|       if (impl::OK == m_->check_status (out)) | ||||
|         { | ||||
|  | ||||
| @ -9,23 +9,46 @@ | ||||
| 
 | ||||
| namespace NetworkMessage | ||||
| { | ||||
|   Builder::Builder (QIODevice * device, Type type, QString const& id) | ||||
|   Builder::Builder (QIODevice * device, Type type, QString const& id, quint32 schema) | ||||
|     : QDataStream {device} | ||||
|   { | ||||
|     common_initialization (type, id); | ||||
|     common_initialization (type, id, schema); | ||||
|   } | ||||
| 
 | ||||
|   Builder::Builder (QByteArray * a, Type type, QString const& id) | ||||
|   Builder::Builder (QByteArray * a, Type type, QString const& id, quint32 schema) | ||||
|     : QDataStream {a, QIODevice::WriteOnly} | ||||
|   { | ||||
|     common_initialization (type, id); | ||||
|     common_initialization (type, id, schema); | ||||
|   } | ||||
| 
 | ||||
|   void Builder::common_initialization (Type type, QString const& id) | ||||
|   void Builder::common_initialization (Type type, QString const& id, quint32 schema) | ||||
|   { | ||||
|     if (schema <= 1) | ||||
|       { | ||||
|         setVersion (QDataStream::Qt_5_0); // Qt schema version
 | ||||
|       } | ||||
| #if QT_VERSION >= 0x050200 | ||||
|     else if (schema <= 2) | ||||
|       { | ||||
|     *this << magic; | ||||
|     *this << schema_number; | ||||
|         setVersion (QDataStream::Qt_5_2); // Qt schema version
 | ||||
|       } | ||||
| #endif | ||||
| #if QT_VERSION >= 0x050400 | ||||
|     else if (schema <= 3) | ||||
|       { | ||||
|         setVersion (QDataStream::Qt_5_4); // Qt schema version
 | ||||
|       } | ||||
| #endif | ||||
|     else | ||||
|       { | ||||
|         throw std::runtime_error {"Unrecognized message schema"}; | ||||
|       } | ||||
| 
 | ||||
|     // the following two items assume that the quint32 encoding is
 | ||||
|     // unchanged over QDataStream versions
 | ||||
|     *this << magic; | ||||
|     *this << schema; | ||||
| 
 | ||||
|     *this << static_cast<quint32> (type) << id.toUtf8 (); | ||||
|   } | ||||
| 
 | ||||
| @ -49,10 +72,18 @@ namespace NetworkMessage | ||||
|         { | ||||
|           parent->setVersion (QDataStream::Qt_5_0); | ||||
|         } | ||||
| #if QT_VERSION >= 0x050200 | ||||
|       else if (schema_ <= 2) | ||||
|         { | ||||
|           parent->setVersion (QDataStream::Qt_5_2); | ||||
|         } | ||||
| #endif | ||||
| #if QT_VERSION >= 0x050400 | ||||
|       else if (schema_ <= 3) | ||||
|         { | ||||
|           parent->setVersion (QDataStream::Qt_5_4); | ||||
|         } | ||||
| #endif | ||||
|       quint32 type; | ||||
|       *parent >> type >> id_; | ||||
|       if (type >= maximum_message_type_) | ||||
|  | ||||
| @ -26,7 +26,7 @@ | ||||
|  * | ||||
|  *      for the serialization details for each type, at the time of | ||||
|  *      writing the above document is for Qt_5_0 format which is buggy | ||||
|  *      so we use Qt_5_2 format, differences are: | ||||
|  *      so we use Qt_5_4 format, differences are: | ||||
|  * | ||||
|  *      QDateTime: | ||||
|  *           QDate      qint64    Julian day number | ||||
| @ -49,19 +49,54 @@ | ||||
|  * strings and null strings. Empty strings have a length of zero | ||||
|  * whereas null strings have a length field of 0xffffffff. | ||||
|  * | ||||
|  * Schema Version 1: | ||||
|  * ----------------- | ||||
|  * Schema Negotiation | ||||
|  * ------------------ | ||||
|  * | ||||
|  * The NetworkMessage::Builder  class specifies a schema  number which | ||||
|  * may be  incremented from time to  time. It represents a  version of | ||||
|  * the underlying encoding schemes used to store data items. Since the | ||||
|  * underlying  encoding  is   defined  by  the  Qt   project  in  it's | ||||
|  * QDataStream  stream operators,  it  is essential  that clients  and | ||||
|  * servers  of  this protocol  can  agree  on  a common  scheme.   The | ||||
|  * NetworkMessage  utility classes  below exchange  the schema  number | ||||
|  * actually used.  The handling of  the schema is backwards compatible | ||||
|  * to  an  extent,  so  long   as  clients  and  servers  are  written | ||||
|  * correctly. For  example a server  written to any  particular schema | ||||
|  * version can communicate with a client written to a later schema. | ||||
|  * | ||||
|  * Schema Version 1:- this schema used the QDataStream::Qt_5_0 version | ||||
|  * 	which is broken. | ||||
|  * | ||||
|  * Schema Version 2:- this schema uses the QDataStream::Qt_5_2 version. | ||||
|  * | ||||
|  * Schema Version 3:- this schema uses the QDataStream::Qt_5_4 version. | ||||
|  * | ||||
|  * | ||||
|  * | ||||
|  * Message       Direction Value                  Type | ||||
|  * ------------- --------- ---------------------- ----------- | ||||
|  * Heartbeat     Out       0                      quint32 | ||||
|  * Heartbeat     Out/In    0                      quint32 | ||||
|  *                         Id (unique key)        utf8 | ||||
|  *                         Maximum schema number  quint32 | ||||
|  * | ||||
|  *		The  heartbeat  message  is  sent  on  a  periodic  basis  every | ||||
|  *		NetworkMessage::pulse  seconds  (see  below).  This  message  is | ||||
|  *		intended to be used by server to detect the presence of a client | ||||
|  *		and  also   the  unexpected  disappearance  of   a  client.  The | ||||
|  *		message_aggregator reference server does just that. | ||||
|  *		The heartbeat  message shall be  sent on a periodic  basis every | ||||
|  *		NetworkMessage::pulse   seconds   (see    below),   the   WSJT-X | ||||
|  *		application  does  that  using the  MessageClient  class.   This | ||||
|  *		message is intended to be used by servers to detect the presence | ||||
|  *		of a  client and also  the unexpected disappearance of  a client | ||||
|  *		and  by clients  to learn  the schema  negotiated by  the server | ||||
|  *		after it receives  the initial heartbeat message  from a client. | ||||
|  *		The message_aggregator reference server does just that using the | ||||
|  *		MessageServer class. Upon  initial startup a client  must send a | ||||
|  *		heartbeat message as soon as  is practical, this message is used | ||||
|  *		to negotiate the maximum schema  number common to the client and | ||||
|  *		server. Note  that the  server may  not be  able to  support the | ||||
|  *		client's  requested maximum  schema  number, in  which case  the | ||||
|  *		first  message received  from the  server will  specify a  lower | ||||
|  *		schema number (never a higher one  as that is not allowed). If a | ||||
|  *		server replies  with a lower  schema number then no  higher than | ||||
|  *		that number shall be used for all further outgoing messages from | ||||
|  *		either clients or the server itself. | ||||
|  * | ||||
|  * | ||||
|  * Status        Out       1                      quint32 | ||||
| @ -222,6 +257,7 @@ | ||||
|  *			"Send" flag is  unset.  Note that this API does  not include a | ||||
|  *			command to  determine the  contents of  the current  free text | ||||
|  *			message. | ||||
|  * | ||||
|  */ | ||||
| 
 | ||||
| #include <QDataStream> | ||||
| @ -254,7 +290,7 @@ namespace NetworkMessage | ||||
|   quint32 constexpr pulse {15}; // seconds
 | ||||
| 
 | ||||
|   //
 | ||||
|   // NetworkMessage::Build - build a message containing serialized Qt types
 | ||||
|   // NetworkMessage::Builder - build a message containing serialized Qt types
 | ||||
|   //
 | ||||
|   class Builder | ||||
|     : public QDataStream | ||||
| @ -263,16 +299,23 @@ namespace NetworkMessage | ||||
|     static quint32 constexpr magic {0xadbccbda}; // never change this
 | ||||
| 
 | ||||
|     // increment this if a newer Qt schema is required and add decode
 | ||||
|     // logic to InputMessageStream below
 | ||||
|     // logic to the Builder and Reader class implementations
 | ||||
| #if QT_VERSION >= 0x050400 | ||||
|     static quint32 constexpr schema_number {3}; | ||||
| #elif QT_VERSION >= 0x050200 | ||||
|     static quint32 constexpr schema_number {2}; | ||||
| #else | ||||
|     // Schema 1 (Qt_5_0) is broken
 | ||||
| #error "Qt version 5.2 or greater required" | ||||
| #endif | ||||
| 
 | ||||
|     explicit Builder (QIODevice *, Type, QString const& id); | ||||
|     explicit Builder (QByteArray *, Type, QString const& id); | ||||
|     explicit Builder (QIODevice *, Type, QString const& id, quint32 schema); | ||||
|     explicit Builder (QByteArray *, Type, QString const& id, quint32 schema); | ||||
|     Builder (Builder const&) = delete; | ||||
|     Builder& operator = (Builder const&) = delete; | ||||
| 
 | ||||
|   private: | ||||
|     void common_initialization (Type type, QString const& id); | ||||
|     void common_initialization (Type type, QString const& id, quint32 schema); | ||||
|   }; | ||||
| 
 | ||||
|   //
 | ||||
|  | ||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user