Reworked messaging lib

This patch updates the messaging lib to use Message Objects
for each message type (text, ack) that know how to send
themselves with the same interface.
This commit is contained in:
Hemna 2020-12-23 13:12:04 -05:00
parent 1d898ea20f
commit 9768003c2a
2 changed files with 297 additions and 190 deletions

View File

@ -21,11 +21,15 @@
#
# python included libs
import concurrent.futures
import functools
import logging
import os
import queue
import random
import signal
import sys
import threading
import time
from logging import NullHandler
from logging.handlers import RotatingFileHandler
@ -53,6 +57,10 @@ LOG_LEVELS = {
CONTEXT_SETTINGS = dict(help_option_names=["-h", "--help"])
# Global threading event to trigger stopping all threads
# When user quits app via CTRL-C
event = None
# localization, please edit:
# HOST = "noam.aprs2.net" # north america tier2 servers round robin
# USER = "KM6XXX-9" # callsign of this aprs client with SSID
@ -140,11 +148,10 @@ def install(append, case_insensitive, shell, path):
def signal_handler(signal, frame):
LOG.info("Ctrl+C, exiting.")
# sys.exit(0) # thread ignores this
os._exit(0)
global event
LOG.info("Ctrl+C, Sending all threads exit!")
sys.exit(0) # thread ignores this
# end signal_handler
@ -172,7 +179,7 @@ def setup_logging(config, loglevel, quiet):
LOG.addHandler(sh)
def process_ack_packet(packet):
def process_ack_packet(packet, config):
ack_num = packet.get("msgNo")
LOG.info("Got ack for message {}".format(ack_num))
messaging.log_message(
@ -182,32 +189,30 @@ def process_ack_packet(packet):
return
def process_mic_e_packet(packet):
def process_mic_e_packet(packet, config):
LOG.info("Mic-E Packet detected. Currenlty unsupported.")
messaging.log_packet(packet)
return
def process_message_packet(packet):
def process_message_packet(packet, config, tx_msg_queue):
LOG.info("Got a message packet")
fromcall = packet["from"]
message = packet.get("message_text", None)
msg_number = packet.get("msgNo", None)
if msg_number:
ack = msg_number
else:
ack = "0"
msg_id = packet.get("msgNo", None)
if not msg_id:
msg_id = "0"
messaging.log_message(
"Received Message", packet["raw"], message, fromcall=fromcall, ack=ack
"Received Message", packet["raw"], message, fromcall=fromcall, ack=msg_id
)
found_command = False
# Get singleton of the PM
pm = plugin.PluginManager()
try:
results = pm.run(fromcall=fromcall, message=message, ack=ack)
results = pm.run(fromcall=fromcall, message=message, ack=msg_id)
for reply in results:
found_command = True
# A plugin can return a null message flag which signals
@ -215,7 +220,11 @@ def process_message_packet(packet):
# nothing to reply with, so we avoid replying with a usage string
if reply is not messaging.NULL_MESSAGE:
LOG.debug("Sending '{}'".format(reply))
messaging.send_message(fromcall, reply)
# msg = {"fromcall": fromcall, "msg": reply}
msg = messaging.TextMessage(config["aprs"]["login"],
fromcall, reply)
tx_msg_queue.put(msg)
else:
LOG.debug("Got NULL MESSAGE from plugin")
@ -225,22 +234,29 @@ def process_message_packet(packet):
names.sort()
reply = "Usage: {}".format(", ".join(names))
messaging.send_message(fromcall, reply)
# messaging.send_message(fromcall, reply)
msg = messaging.TextMessage(config["aprs"]["login"],
fromcall, reply)
tx_msg_queue.put(msg)
except Exception as ex:
LOG.exception("Plugin failed!!!", ex)
reply = "A Plugin failed! try again?"
messaging.send_message(fromcall, reply)
# messaging.send_message(fromcall, reply)
msg = messaging.TextMessage(config["aprs"]["login"],
fromcall, reply)
tx_msg_queue.put(msg)
# let any threads do their thing, then ack
# send an ack last
messaging.send_ack(fromcall, ack)
ack = messaging.AckMessage(config["aprs"]["login"], fromcall, msg_id=msg_id)
ack.send()
LOG.debug("Packet processing complete")
def process_packet(packet):
def process_packet(packet, config=None, msg_queues=None, event=None):
"""Process a packet recieved from aprs-is server."""
LOG.debug("Process packet!")
LOG.debug("Process packet! {}".format(msg_queues))
try:
LOG.debug("Got message: {}".format(packet))
@ -250,15 +266,15 @@ def process_packet(packet):
if msg_format == "message" and msg:
# we want to send the message through the
# plugins
process_message_packet(packet)
process_message_packet(packet, config, msg_queues["tx"])
return
elif msg_response == "ack":
process_ack_packet(packet)
process_ack_packet(packet, config)
return
if msg_format == "mic-e":
# process a mic-e packet
process_mic_e_packet(packet)
process_mic_e_packet(packet, config)
return
except (aprslib.ParseError, aprslib.UnknownFormat) as exp:
@ -350,17 +366,15 @@ def send_message(
message = packet.get("message_text", None)
LOG.info("We got a new message")
fromcall = packet["from"]
msg_number = packet.get("msgNo", None)
if msg_number:
ack = msg_number
else:
ack = "0"
msg_number = packet.get("msgNo", "0")
messaging.log_message(
"Received Message", packet["raw"], message, fromcall=fromcall, ack=ack
"Received Message", packet["raw"], message, fromcall=fromcall, ack=msg_number
)
got_response = True
# Send the ack back?
messaging.send_ack_direct(fromcall, ack)
ack = messaging.AckMessage(config["aprs"]["login"],
fromcall, msg_id=msg_number)
ack.send_direct()
if got_ack and got_response:
sys.exit(0)
@ -372,7 +386,9 @@ def send_message(
# We should get an ack back as well as a new message
# we should bail after we get the ack and send an ack back for the
# message
messaging.send_message_direct(tocallsign, command, message_number)
msg = messaging.TextMessage(aprs_login, tocallsign, command)
msg.send_direct()
#messaging.send_message_direct(tocallsign, command, message_number)
try:
# This will register a packet consumer with aprslib
@ -389,6 +405,16 @@ def send_message(
cl.reset()
def tx_msg_thread(msg_queues=None, event=None):
"""Thread to handle sending any messages outbound."""
LOG.info("TX_MSG_THREAD")
while not event.is_set() or not msg_queues["tx"].empty():
msg = msg_queues["tx"].get()
LOG.info("TXQ: got message '{}'".format(msg))
msg.send()
#messaging.send_message(msg["fromcall"], msg["msg"])
# main() ###
@main.command()
@click.option(
@ -418,7 +444,9 @@ def send_message(
)
def server(loglevel, quiet, disable_validation, config_file):
"""Start the aprsd server process."""
global event
event = threading.Event()
signal.signal(signal.SIGINT, signal_handler)
click.echo("Load config")
@ -429,7 +457,6 @@ def server(loglevel, quiet, disable_validation, config_file):
# Accept the config as a constructor param, instead of this
# hacky global setting
email.CONFIG = config
messaging.CONFIG = config
setup_logging(config, loglevel, quiet)
LOG.info("APRSD Started version: {}".format(aprsd.__version__))
@ -449,7 +476,22 @@ def server(loglevel, quiet, disable_validation, config_file):
plugin_manager.setup_plugins()
cl = client.Client(config)
# setup and run the main blocking loop
rx_msg_queue = queue.Queue(maxsize=20)
tx_msg_queue = queue.Queue(maxsize=20)
msg_queues = {"rx": rx_msg_queue,
"tx": tx_msg_queue,}
rx_msg = threading.Thread(
target=rx_msg_thread, name="RX_msg", kwargs={'msg_queues':msg_queues,
'event': event}
)
tx_msg = threading.Thread(
target=tx_msg_thread, name="TX_msg", kwargs={'msg_queues':msg_queues,
'event': event}
)
rx_msg.start()
tx_msg.start()
while True:
# Now use the helper which uses the singleton
aprs_client = client.get_client()
@ -459,14 +501,30 @@ def server(loglevel, quiet, disable_validation, config_file):
# This will register a packet consumer with aprslib
# When new packets come in the consumer will process
# the packet
aprs_client.consumer(process_packet, raw=False)
# Do a partial here because the consumer signature doesn't allow
# For kwargs to be passed in to the consumer func we declare
# and the aprslib developer didn't want to allow a PR to add
# kwargs. :(
# https://github.com/rossengeorgiev/aprs-python/pull/56
process_partial = functools.partial(process_packet,
msg_queues=msg_queues,
event=event, config=config)
aprs_client.consumer(process_partial, raw=False)
except aprslib.exceptions.ConnectionDrop:
LOG.error("Connection dropped, reconnecting")
time.sleep(5)
# Force the deletion of the client object connected to aprs
# This will cause a reconnect, next time client.get_client()
# is called
cl.reset()
client.Client().reset()
LOG.info("APRSD Exiting.")
tx_msg.join()
sys.exit(0)
# setup and run the main blocking loop
if __name__ == "__main__":

View File

@ -1,161 +1,239 @@
import abc
import logging
from multiprocessing import RawValue
import pprint
import re
import threading
import time
import uuid
from aprsd import client
LOG = logging.getLogger("APRSD")
CONFIG = None
# current aprs radio message number, increments for each message we
# send over rf {int}
message_number = 0
# message_nubmer:ack combos so we stop sending a message after an
# ack from radio {int:int}
# FIXME
ack_dict = {}
# What to return from a plugin if we have processed the message
# and it's ok, but don't send a usage string back
NULL_MESSAGE = -1
class MessageCounter(object):
"""
Global message id counter class.
def send_ack_thread(tocall, ack, retry_count):
cl = client.get_client()
tocall = tocall.ljust(9) # pad to nine chars
line = "{}>APRS::{}:ack{}\n".format(CONFIG["aprs"]["login"], tocall, ack)
for i in range(retry_count, 0, -1):
log_message(
"Sending ack",
line.rstrip("\n"),
None,
ack=ack,
tocall=tocall,
retry_number=i,
)
cl.sendall(line)
# aprs duplicate detection is 30 secs?
# (21 only sends first, 28 skips middle)
time.sleep(31)
# end_send_ack_thread
This is a singleton based class that keeps
an incrementing counter for all messages to
be sent. All new Message objects gets a new
message id, which is the next number available
from the MessageCounter.
"""
_instance = None
def __new__(cls, *args, **kwargs):
"""Make this a singleton class."""
if cls._instance is None:
cls._instance = super(MessageCounter, cls).__new__(cls)
cls._instance.val = RawValue('i', 1)
cls._instance.lock = threading.Lock()
return cls._instance
def increment(self):
with self.lock:
self.val.value += 1
@property
def value(self):
with self.lock:
return self.val.value
def __repr__(self):
with self.lock:
return str(self.val.value)
def __str__(self):
with self.lock:
return str(self.val.value)
def send_ack(tocall, ack):
LOG.debug("Send ACK({}:{}) to radio.".format(tocall, ack))
class Message(object, metaclass=abc.ABCMeta):
"""Base Message Class."""
# The message id to send over the air
id = 0
# Unique identifier for this message
uuid = None
sent = False
sent_time = None
acked = False
acked_time = None
retry_count = 3
thread = threading.Thread(
target=send_ack_thread, name="send_ack", args=(tocall, ack, retry_count)
)
thread.start()
# end send_ack()
def __init__(self, fromcall, tocall, msg_id=None):
self.fromcall = fromcall
self.tocall = tocall
self.uuid = uuid.uuid4()
if not msg_id:
c = MessageCounter()
c.increment()
msg_id = c.value
self.id = msg_id
@abc.abstractmethod
def send(self):
"""Child class must declare."""
pass
def send_ack_direct(tocall, ack):
"""Send an ack message without a separate thread."""
LOG.debug("Send ACK({}:{}) to radio.".format(tocall, ack))
cl = client.get_client()
fromcall = CONFIG["aprs"]["login"]
line = "{}>APRS::{}:ack{}\n".format(fromcall, tocall, ack)
log_message(
"Sending ack",
line.rstrip("\n"),
None,
ack=ack,
tocall=tocall,
fromcall=fromcall,
)
cl.sendall(line)
class TextMessage(Message):
"""Send regular ARPS text/command messages/replies."""
message = None
def __init__(self, fromcall, tocall, message):
super(TextMessage, self).__init__(fromcall, tocall)
self.message = message
def __repr__(self):
"""Build raw string to send over the air."""
return "{}>APRS::{}:{}{{{}\n".format(
self.fromcall, self.tocall.ljust(9),
self._filter_for_send(), str(self.id),)
def __str__(self):
return "From({}) TO({}) - Message({}): '{}'".format(
self.fromcall, self.tocall,
self.id, self.message
)
def ack(self):
"""Build an Ack Message object from this object."""
return AckMessage(self.fromcall, self.tocall, msg_id=self.id)
def _filter_for_send(self):
"""Filter and format message string for FCC."""
# max? ftm400 displays 64, raw msg shows 74
# and ftm400-send is max 64. setting this to
# 67 displays 64 on the ftm400. (+3 {01 suffix)
# feature req: break long ones into two msgs
message = self.message[:67]
# We all miss George Carlin
return re.sub("fuck|shit|cunt|piss|cock|bitch", "****", message)
def send_thread(self):
cl = client.get_client()
for i in range(self.retry_count, 0, -1):
LOG.debug("DEBUG: send_message_thread msg:ack combos are: ")
LOG.debug(pprint.pformat(ack_dict))
if ack_dict[self.id] != 1:
log_message(
"Sending Message",
repr(self).rstrip("\n"),
self.message,
tocall=self.tocall,
retry_number=i,
)
# tn.write(line)
cl.sendall(repr(self))
# decaying repeats, 31 to 93 second intervals
sleeptime = (self.retry_count - i + 1) * 31
time.sleep(sleeptime)
else:
break
return
# end send_message_thread
def send(self):
global ack_dict
# TODO(Hemna) - Need a better metchanism for this.
# This can nuke an ack_dict while it's still being used.
# FIXME FIXME
if len(ack_dict) > 90:
# empty ack dict if it's really big, could result in key error later
LOG.debug(
"DEBUG: Length of ack dictionary is big at %s clearing." % len(ack_dict)
)
ack_dict.clear()
LOG.debug(pprint.pformat(ack_dict))
LOG.debug(
"DEBUG: Cleared ack dictionary, ack_dict length is now %s." % len(ack_dict)
)
ack_dict[self.id] = 0 # clear ack for this message number
thread = threading.Thread(target=self.send_thread, name="send_message")
thread.start()
def send_direct(self):
"""Send a message without a separate thread."""
cl = client.get_client()
log_message(
"Sending Message Direct", repr(self).rstrip("\n"), self.message, tocall=self.tocall,
fromcall=self.fromcall
)
cl.sendall(repr(self))
def send_message_thread(tocall, message, this_message_number, retry_count):
cl = client.get_client()
line = "{}>APRS::{}:{}{{{}\n".format(
CONFIG["aprs"]["login"],
tocall,
message,
str(this_message_number),
)
for i in range(retry_count, 0, -1):
LOG.debug("DEBUG: send_message_thread msg:ack combos are: ")
LOG.debug(pprint.pformat(ack_dict))
if ack_dict[this_message_number] != 1:
class AckMessage(Message):
"""Class for building Acks and sending them."""
def __init__(self, fromcall, tocall, msg_id):
super(AckMessage, self).__init__(fromcall, tocall, msg_id=msg_id)
def __repr__(self):
return "{}>APRS::{}:ack{}\n".format(
self.fromcall, self.tocall.ljust(9), self.id)
def __str__(self):
return "From({}) TO({}) Ack ({})".format(
self.fromcall, self.tocall,
self.id
)
def send_thread(self):
"""Separate thread to send acks with retries."""
cl = client.get_client()
for i in range(self.retry_count, 0, -1):
log_message(
"Sending Message",
line.rstrip("\n"),
message,
tocall=tocall,
"Sending ack",
repr(self).rstrip("\n"),
None,
ack=self.id,
tocall=self.tocall,
retry_number=i,
)
# tn.write(line)
cl.sendall(line)
# decaying repeats, 31 to 93 second intervals
sleeptime = (retry_count - i + 1) * 31
time.sleep(sleeptime)
else:
break
return
# end send_message_thread
cl.sendall(repr(self))
# aprs duplicate detection is 30 secs?
# (21 only sends first, 28 skips middle)
time.sleep(31)
# end_send_ack_thread
def send_message(tocall, message):
global message_number
global ack_dict
retry_count = 3
if message_number > 98: # global
message_number = 0
message_number += 1
if len(ack_dict) > 90:
# empty ack dict if it's really big, could result in key error later
LOG.debug(
"DEBUG: Length of ack dictionary is big at %s clearing." % len(ack_dict)
def send(self):
LOG.debug("Send ACK({}:{}) to radio.".format(self.tocall, self.id))
thread = threading.Thread(
target=self.send_thread, name="send_ack"
)
ack_dict.clear()
LOG.debug(pprint.pformat(ack_dict))
LOG.debug(
"DEBUG: Cleared ack dictionary, ack_dict length is now %s." % len(ack_dict)
thread.start()
# end send_ack()
def send_direct(self):
"""Send an ack message without a separate thread."""
cl = client.get_client()
log_message(
"Sending ack",
repr(self).rstrip("\n"),
None,
ack=self.id,
tocall=self.tocall,
fromcall=self.fromcall,
)
ack_dict[message_number] = 0 # clear ack for this message number
tocall = tocall.ljust(9) # pad to nine chars
# max? ftm400 displays 64, raw msg shows 74
# and ftm400-send is max 64. setting this to
# 67 displays 64 on the ftm400. (+3 {01 suffix)
# feature req: break long ones into two msgs
message = message[:67]
# We all miss George Carlin
message = re.sub("fuck|shit|cunt|piss|cock|bitch", "****", message)
thread = threading.Thread(
target=send_message_thread,
name="send_message",
args=(tocall, message, message_number, retry_count),
)
thread.start()
return ()
# end send_message()
def send_message_direct(tocall, message, message_number=None):
"""Send a message without a separate thread."""
cl = client.get_client()
if not message_number:
this_message_number = 1
else:
this_message_number = message_number
fromcall = CONFIG["aprs"]["login"]
line = "{}>APRS::{}:{}{{{}\n".format(
fromcall,
tocall,
message,
str(this_message_number),
)
LOG.debug("DEBUG: send_message_thread msg:ack combos are: ")
log_message(
"Sending Message", line.rstrip("\n"), message, tocall=tocall, fromcall=fromcall
)
cl.sendall(line)
cl.sendall(repr(self))
def log_packet(packet):
@ -236,32 +314,3 @@ def log_message(
log_list.append(" {} _______________ Complete".format(header))
LOG.info("\n".join(log_list))
def process_message(line):
f = re.search("^(.*)>", line)
fromcall = f.group(1)
searchstring = "::%s[ ]*:(.*)" % CONFIG["aprs"]["login"]
# verify this, callsign is padded out with spaces to colon
m = re.search(searchstring, line)
fullmessage = m.group(1)
ack_attached = re.search("(.*){([0-9A-Z]+)", fullmessage)
# ack formats include: {1, {AB}, {12
if ack_attached:
# "{##" suffix means radio wants an ack back
# message content
message = ack_attached.group(1)
# suffix number to use in ack
ack_num = ack_attached.group(2)
else:
message = fullmessage
# ack not requested, but lets send one as 0
ack_num = "0"
log_message(
"Received message", line, message, fromcall=fromcall, msg_num=str(ack_num)
)
return (fromcall, message, ack_num)
# end process_message()