diff --git a/aprsd/client.py b/aprsd/client.py index b3613dd..58b1be4 100644 --- a/aprsd/client.py +++ b/aprsd/client.py @@ -1,4 +1,6 @@ import logging +import select +import socket import time import aprslib @@ -45,7 +47,7 @@ class Client(object): while not connected: try: LOG.info("Creating aprslib client") - aprs_client = aprslib.IS(user, passwd=password, host=host, port=port) + aprs_client = Aprsdis(user, passwd=password, host=host, port=port) # Force the logging to be the same aprs_client.logger = LOG aprs_client.connect() @@ -60,6 +62,63 @@ class Client(object): return aprs_client +class Aprsdis(aprslib.IS): + """Extend the aprslib class so we can exit properly.""" + + # flag to tell us to stop + thread_stop = False + + # timeout in seconds + select_timeout = 10 + + def stop(self): + self.thread_stop = True + + def _socket_readlines(self, blocking=False): + """ + Generator for complete lines, received from the server + """ + try: + self.sock.setblocking(0) + except socket.error as e: + self.logger.error("socket error when setblocking(0): %s" % str(e)) + raise aprslib.ConnectionDrop("connection dropped") + + while not self.thread_stop: + short_buf = b"" + newline = b"\r\n" + + # set a select timeout, so we get a chance to exit + # when user hits CTRL-C + readable, writable, exceptional = select.select( + [self.sock], [], [], self.select_timeout + ) + if not readable: + self.logger.info("nothing to read") + continue + + try: + short_buf = self.sock.recv(4096) + + # sock.recv returns empty if the connection drops + if not short_buf: + self.logger.error("socket.recv(): returned empty") + raise aprslib.ConnectionDrop("connection dropped") + except socket.error as e: + # self.logger.error("socket error on recv(): %s" % str(e)) + if "Resource temporarily unavailable" in str(e): + if not blocking: + if len(self.buf) == 0: + break + + self.buf += short_buf + + while newline in self.buf: + line, self.buf = self.buf.split(newline, 1) + + yield line + + def get_client(): cl = Client() return cl.client diff --git a/aprsd/main.py b/aprsd/main.py index c0aa32b..58dfee0 100644 --- a/aprsd/main.py +++ b/aprsd/main.py @@ -21,12 +21,9 @@ # # python included libs -import concurrent.futures -import functools import logging import os import queue -import random import signal import sys import threading @@ -41,7 +38,7 @@ import yaml # local imports here import aprsd -from aprsd import client, email, messaging, plugin, utils +from aprsd import client, email, messaging, plugin, threads, utils # setup the global logger # logging.basicConfig(level=logging.DEBUG) # level=10 @@ -57,9 +54,7 @@ LOG_LEVELS = { CONTEXT_SETTINGS = dict(help_option_names=["-h", "--help"]) -# Global threading event to trigger stopping all threads -# When user quits app via CTRL-C -event = None +server_threads = [] # localization, please edit: # HOST = "noam.aprs2.net" # north america tier2 servers round robin @@ -151,7 +146,11 @@ def signal_handler(signal, frame): global event LOG.info("Ctrl+C, Sending all threads exit!") + for th in server_threads: + th.stop() sys.exit(0) # thread ignores this + + # end signal_handler @@ -179,108 +178,6 @@ def setup_logging(config, loglevel, quiet): LOG.addHandler(sh) -def process_ack_packet(packet, config): - ack_num = packet.get("msgNo") - LOG.info("Got ack for message {}".format(ack_num)) - messaging.log_message( - "ACK", packet["raw"], None, ack=ack_num, fromcall=packet["from"] - ) - messaging.ack_dict.update({int(ack_num): 1}) - return - - -def process_mic_e_packet(packet, config): - LOG.info("Mic-E Packet detected. Currenlty unsupported.") - messaging.log_packet(packet) - return - - -def process_message_packet(packet, config, tx_msg_queue): - LOG.info("Got a message packet") - fromcall = packet["from"] - message = packet.get("message_text", None) - - msg_id = packet.get("msgNo", None) - if not msg_id: - msg_id = "0" - - messaging.log_message( - "Received Message", packet["raw"], message, fromcall=fromcall, ack=msg_id - ) - - found_command = False - # Get singleton of the PM - pm = plugin.PluginManager() - try: - results = pm.run(fromcall=fromcall, message=message, ack=msg_id) - for reply in results: - found_command = True - # A plugin can return a null message flag which signals - # us that they processed the message correctly, but have - # nothing to reply with, so we avoid replying with a usage string - if reply is not messaging.NULL_MESSAGE: - LOG.debug("Sending '{}'".format(reply)) - - # msg = {"fromcall": fromcall, "msg": reply} - msg = messaging.TextMessage(config["aprs"]["login"], - fromcall, reply) - tx_msg_queue.put(msg) - else: - LOG.debug("Got NULL MESSAGE from plugin") - - if not found_command: - plugins = pm.get_plugins() - names = [x.command_name for x in plugins] - names.sort() - - reply = "Usage: {}".format(", ".join(names)) - # messaging.send_message(fromcall, reply) - msg = messaging.TextMessage(config["aprs"]["login"], - fromcall, reply) - tx_msg_queue.put(msg) - except Exception as ex: - LOG.exception("Plugin failed!!!", ex) - reply = "A Plugin failed! try again?" - # messaging.send_message(fromcall, reply) - msg = messaging.TextMessage(config["aprs"]["login"], - fromcall, reply) - tx_msg_queue.put(msg) - - # let any threads do their thing, then ack - # send an ack last - ack = messaging.AckMessage(config["aprs"]["login"], fromcall, msg_id=msg_id) - ack.send() - LOG.debug("Packet processing complete") - - -def process_packet(packet, config=None, msg_queues=None, event=None): - """Process a packet recieved from aprs-is server.""" - - LOG.debug("Process packet! {}".format(msg_queues)) - try: - LOG.debug("Got message: {}".format(packet)) - - msg = packet.get("message_text", None) - msg_format = packet.get("format", None) - msg_response = packet.get("response", None) - if msg_format == "message" and msg: - # we want to send the message through the - # plugins - process_message_packet(packet, config, msg_queues["tx"]) - return - elif msg_response == "ack": - process_ack_packet(packet, config) - return - - if msg_format == "mic-e": - # process a mic-e packet - process_mic_e_packet(packet, config) - return - - except (aprslib.ParseError, aprslib.UnknownFormat) as exp: - LOG.exception("Failed to parse packet from aprs-is", exp) - - @main.command() def sample_config(): """This dumps the config to stdout.""" @@ -345,7 +242,6 @@ def send_message( setup_logging(config, loglevel, quiet) LOG.info("APRSD Started version: {}".format(aprsd.__version__)) - message_number = random.randint(1, 90) if type(command) is tuple: command = " ".join(command) LOG.info("Sending Command '{}'".format(command)) @@ -368,12 +264,17 @@ def send_message( fromcall = packet["from"] msg_number = packet.get("msgNo", "0") messaging.log_message( - "Received Message", packet["raw"], message, fromcall=fromcall, ack=msg_number + "Received Message", + packet["raw"], + message, + fromcall=fromcall, + ack=msg_number, ) got_response = True # Send the ack back? - ack = messaging.AckMessage(config["aprs"]["login"], - fromcall, msg_id=msg_number) + ack = messaging.AckMessage( + config["aprs"]["login"], fromcall, msg_id=msg_number + ) ack.send_direct() if got_ack and got_response: @@ -388,7 +289,7 @@ def send_message( # message msg = messaging.TextMessage(aprs_login, tocallsign, command) msg.send_direct() - #messaging.send_message_direct(tocallsign, command, message_number) + # messaging.send_message_direct(tocallsign, command, message_number) try: # This will register a packet consumer with aprslib @@ -405,16 +306,6 @@ def send_message( cl.reset() -def tx_msg_thread(msg_queues=None, event=None): - """Thread to handle sending any messages outbound.""" - LOG.info("TX_MSG_THREAD") - while not event.is_set() or not msg_queues["tx"].empty(): - msg = msg_queues["tx"].get() - LOG.info("TXQ: got message '{}'".format(msg)) - msg.send() - #messaging.send_message(msg["fromcall"], msg["msg"]) - - # main() ### @main.command() @click.option( @@ -474,55 +365,24 @@ def server(loglevel, quiet, disable_validation, config_file): # Create the initial PM singleton and Register plugins plugin_manager = plugin.PluginManager(config) plugin_manager.setup_plugins() - cl = client.Client(config) + client.Client(config) rx_msg_queue = queue.Queue(maxsize=20) tx_msg_queue = queue.Queue(maxsize=20) - msg_queues = {"rx": rx_msg_queue, - "tx": tx_msg_queue,} - - rx_msg = threading.Thread( - target=rx_msg_thread, name="RX_msg", kwargs={'msg_queues':msg_queues, - 'event': event} - ) - tx_msg = threading.Thread( - target=tx_msg_thread, name="TX_msg", kwargs={'msg_queues':msg_queues, - 'event': event} - ) - rx_msg.start() - tx_msg.start() - - while True: - # Now use the helper which uses the singleton - aprs_client = client.get_client() - - # setup the consumer of messages and block until a messages - try: - # This will register a packet consumer with aprslib - # When new packets come in the consumer will process - # the packet - - # Do a partial here because the consumer signature doesn't allow - # For kwargs to be passed in to the consumer func we declare - # and the aprslib developer didn't want to allow a PR to add - # kwargs. :( - # https://github.com/rossengeorgiev/aprs-python/pull/56 - process_partial = functools.partial(process_packet, - msg_queues=msg_queues, - event=event, config=config) - aprs_client.consumer(process_partial, raw=False) - except aprslib.exceptions.ConnectionDrop: - LOG.error("Connection dropped, reconnecting") - time.sleep(5) - # Force the deletion of the client object connected to aprs - # This will cause a reconnect, next time client.get_client() - # is called - client.Client().reset() + msg_queues = { + "rx": rx_msg_queue, + "tx": tx_msg_queue, + } + rx_thread = threads.APRSDRXThread(msg_queues=msg_queues, config=config) + tx_thread = threads.APRSDTXThread(msg_queues=msg_queues, config=config) + # TODO(hemna): add EmailThread + server_threads.append(rx_thread) + server_threads.append(tx_thread) + rx_thread.start() + tx_thread.start() LOG.info("APRSD Exiting.") - tx_msg.join() - sys.exit(0) # setup and run the main blocking loop diff --git a/aprsd/messaging.py b/aprsd/messaging.py index 25c3c85..ded7ac7 100644 --- a/aprsd/messaging.py +++ b/aprsd/messaging.py @@ -1,11 +1,11 @@ import abc import logging -from multiprocessing import RawValue import pprint import re import threading import time import uuid +from multiprocessing import RawValue from aprsd import client @@ -20,6 +20,7 @@ ack_dict = {} # and it's ok, but don't send a usage string back NULL_MESSAGE = -1 + class MessageCounter(object): """ Global message id counter class. @@ -31,13 +32,14 @@ class MessageCounter(object): from the MessageCounter. """ + _instance = None def __new__(cls, *args, **kwargs): """Make this a singleton class.""" if cls._instance is None: cls._instance = super(MessageCounter, cls).__new__(cls) - cls._instance.val = RawValue('i', 1) + cls._instance.val = RawValue("i", 1) cls._instance.lock = threading.Lock() return cls._instance @@ -61,6 +63,7 @@ class MessageCounter(object): class Message(object, metaclass=abc.ABCMeta): """Base Message Class.""" + # The message id to send over the air id = 0 @@ -102,14 +105,16 @@ class TextMessage(Message): def __repr__(self): """Build raw string to send over the air.""" return "{}>APRS::{}:{}{{{}\n".format( - self.fromcall, self.tocall.ljust(9), - self._filter_for_send(), str(self.id),) + self.fromcall, + self.tocall.ljust(9), + self._filter_for_send(), + str(self.id), + ) def __str__(self): return "From({}) TO({}) - Message({}): '{}'".format( - self.fromcall, self.tocall, - self.id, self.message - ) + self.fromcall, self.tocall, self.id, self.message + ) def ack(self): """Build an Ack Message object from this object.""" @@ -162,7 +167,8 @@ class TextMessage(Message): ack_dict.clear() LOG.debug(pprint.pformat(ack_dict)) LOG.debug( - "DEBUG: Cleared ack dictionary, ack_dict length is now %s." % len(ack_dict) + "DEBUG: Cleared ack dictionary, ack_dict length is now %s." + % len(ack_dict) ) ack_dict[self.id] = 0 # clear ack for this message number @@ -173,8 +179,11 @@ class TextMessage(Message): """Send a message without a separate thread.""" cl = client.get_client() log_message( - "Sending Message Direct", repr(self).rstrip("\n"), self.message, tocall=self.tocall, - fromcall=self.fromcall + "Sending Message Direct", + repr(self).rstrip("\n"), + self.message, + tocall=self.tocall, + fromcall=self.fromcall, ) cl.sendall(repr(self)) @@ -182,19 +191,16 @@ class TextMessage(Message): class AckMessage(Message): """Class for building Acks and sending them.""" - def __init__(self, fromcall, tocall, msg_id): super(AckMessage, self).__init__(fromcall, tocall, msg_id=msg_id) def __repr__(self): return "{}>APRS::{}:ack{}\n".format( - self.fromcall, self.tocall.ljust(9), self.id) + self.fromcall, self.tocall.ljust(9), self.id + ) def __str__(self): - return "From({}) TO({}) Ack ({})".format( - self.fromcall, self.tocall, - self.id - ) + return "From({}) TO({}) Ack ({})".format(self.fromcall, self.tocall, self.id) def send_thread(self): """Separate thread to send acks with retries.""" @@ -216,10 +222,9 @@ class AckMessage(Message): def send(self): LOG.debug("Send ACK({}:{}) to radio.".format(self.tocall, self.id)) - thread = threading.Thread( - target=self.send_thread, name="send_ack" - ) + thread = threading.Thread(target=self.send_thread, name="send_ack") thread.start() + # end send_ack() def send_direct(self): diff --git a/aprsd/threads.py b/aprsd/threads.py new file mode 100644 index 0000000..2b666cb --- /dev/null +++ b/aprsd/threads.py @@ -0,0 +1,186 @@ +import logging +import queue +import threading +import time + +import aprslib + +from aprsd import client, messaging, plugin + +LOG = logging.getLogger("APRSD") + + +class APRSDThread(threading.Thread): + def __init__(self, name, msg_queues, config): + super(APRSDThread, self).__init__(name=name) + self.msg_queues = msg_queues + self.config = config + self.thread_stop = False + + def stop(self): + self.thread_stop = True + + def run(self): + while not self.thread_stop: + self._run() + + +class APRSDRXThread(APRSDThread): + def __init__(self, msg_queues, config): + super(APRSDRXThread, self).__init__("RX_MSG", msg_queues, config) + self.thread_stop = False + + def stop(self): + self.thread_stop = True + self.aprs.stop() + + def callback(self, packet): + try: + packet = aprslib.parse(packet) + print(packet) + except (aprslib.ParseError, aprslib.UnknownFormat): + pass + + def run(self): + LOG.info("Starting") + while not self.thread_stop: + aprs_client = client.get_client() + + # setup the consumer of messages and block until a messages + try: + # This will register a packet consumer with aprslib + # When new packets come in the consumer will process + # the packet + + # Do a partial here because the consumer signature doesn't allow + # For kwargs to be passed in to the consumer func we declare + # and the aprslib developer didn't want to allow a PR to add + # kwargs. :( + # https://github.com/rossengeorgiev/aprs-python/pull/56 + aprs_client.consumer(self.process_packet, raw=False, blocking=False) + + except aprslib.exceptions.ConnectionDrop: + LOG.error("Connection dropped, reconnecting") + time.sleep(5) + # Force the deletion of the client object connected to aprs + # This will cause a reconnect, next time client.get_client() + # is called + client.Client().reset() + LOG.info("Exiting ") + + def process_ack_packet(self, packet): + ack_num = packet.get("msgNo") + LOG.info("Got ack for message {}".format(ack_num)) + messaging.log_message( + "ACK", packet["raw"], None, ack=ack_num, fromcall=packet["from"] + ) + messaging.ack_dict.update({int(ack_num): 1}) + return + + def process_mic_e_packet(self, packet): + LOG.info("Mic-E Packet detected. Currenlty unsupported.") + messaging.log_packet(packet) + return + + def process_message_packet(self, packet): + LOG.info("Got a message packet") + fromcall = packet["from"] + message = packet.get("message_text", None) + + msg_id = packet.get("msgNo", None) + if not msg_id: + msg_id = "0" + + messaging.log_message( + "Received Message", packet["raw"], message, fromcall=fromcall, ack=msg_id + ) + + found_command = False + # Get singleton of the PM + pm = plugin.PluginManager() + try: + results = pm.run(fromcall=fromcall, message=message, ack=msg_id) + for reply in results: + found_command = True + # A plugin can return a null message flag which signals + # us that they processed the message correctly, but have + # nothing to reply with, so we avoid replying with a usage string + if reply is not messaging.NULL_MESSAGE: + LOG.debug("Sending '{}'".format(reply)) + + # msg = {"fromcall": fromcall, "msg": reply} + msg = messaging.TextMessage( + self.config["aprs"]["login"], fromcall, reply + ) + self.msg_queues["tx"].put(msg) + else: + LOG.debug("Got NULL MESSAGE from plugin") + + if not found_command: + plugins = pm.get_plugins() + names = [x.command_name for x in plugins] + names.sort() + + reply = "Usage: {}".format(", ".join(names)) + # messaging.send_message(fromcall, reply) + msg = messaging.TextMessage( + self.config["aprs"]["login"], fromcall, reply + ) + self.msg_queues["tx"].put(msg) + except Exception as ex: + LOG.exception("Plugin failed!!!", ex) + reply = "A Plugin failed! try again?" + # messaging.send_message(fromcall, reply) + msg = messaging.TextMessage(self.config["aprs"]["login"], fromcall, reply) + self.msg_queues["tx"].put(msg) + + # let any threads do their thing, then ack + # send an ack last + ack = messaging.AckMessage( + self.config["aprs"]["login"], fromcall, msg_id=msg_id + ) + ack.send() + LOG.debug("Packet processing complete") + + def process_packet(self, packet): + """Process a packet recieved from aprs-is server.""" + + LOG.debug("Process packet! {}".format(self.msg_queues)) + try: + LOG.debug("Got message: {}".format(packet)) + + msg = packet.get("message_text", None) + msg_format = packet.get("format", None) + msg_response = packet.get("response", None) + if msg_format == "message" and msg: + # we want to send the message through the + # plugins + self.process_message_packet(packet) + return + elif msg_response == "ack": + self.process_ack_packet(packet) + return + + if msg_format == "mic-e": + # process a mic-e packet + self.process_mic_e_packet(packet) + return + + except (aprslib.ParseError, aprslib.UnknownFormat) as exp: + LOG.exception("Failed to parse packet from aprs-is", exp) + + +class APRSDTXThread(APRSDThread): + def __init__(self, msg_queues, config): + super(APRSDTXThread, self).__init__("TX_MSG", msg_queues, config) + + def run(self): + LOG.info("Starting") + while not self.thread_stop: + try: + msg = self.msg_queues["tx"].get(timeout=0.1) + LOG.info("TXQ: got message '{}'".format(msg)) + msg.send() + except queue.Empty: + pass + LOG.info("Exiting ")