mirror of
https://github.com/craigerl/aprsd.git
synced 2024-11-24 08:58:49 -05:00
Compare commits
4 Commits
c85c87daff
...
ec7aeeeaf3
Author | SHA1 | Date | |
---|---|---|---|
ec7aeeeaf3 | |||
dcf0093d45 | |||
ab421b87e6 | |||
87cbcaa47f |
1
.github/workflows/master-build.yml
vendored
1
.github/workflows/master-build.yml
vendored
@ -56,6 +56,7 @@ jobs:
|
||||
file: ./Dockerfile-dev
|
||||
build-args: |
|
||||
BRANCH=${{ steps.branch-name.outputs.current_branch }}
|
||||
BUILDX_QEMU_ENV=true
|
||||
push: true
|
||||
tags: |
|
||||
hemna6969/aprsd:${{ steps.branch-name.outputs.current_branch }}
|
||||
|
3
.github/workflows/release_build.yml
vendored
3
.github/workflows/release_build.yml
vendored
@ -39,10 +39,11 @@ jobs:
|
||||
uses: docker/build-push-action@v3
|
||||
with:
|
||||
context: "{{defaultContext}}:docker"
|
||||
platforms: linux/amd64,linux/arm64
|
||||
platforms: linux/amd64,linux/arm64,linux/arm/v7
|
||||
file: ./Dockerfile
|
||||
build-args: |
|
||||
VERSION=${{ inputs.aprsd_version }}
|
||||
BUILDX_QEMU_ENV=true
|
||||
push: true
|
||||
tags: |
|
||||
hemna6969/aprsd:v${{ inputs.aprsd_version }}
|
||||
|
@ -34,7 +34,7 @@ import click_completion
|
||||
import aprsd
|
||||
from aprsd import cli_helper
|
||||
from aprsd import config as aprsd_config
|
||||
from aprsd import messaging, packets, stats, threads, utils
|
||||
from aprsd import packets, stats, threads, utils
|
||||
|
||||
|
||||
# setup the global logger
|
||||
@ -85,7 +85,7 @@ def signal_handler(sig, frame):
|
||||
),
|
||||
)
|
||||
time.sleep(1.5)
|
||||
messaging.MsgTrack().save()
|
||||
packets.PacketTrack().save()
|
||||
packets.WatchList().save()
|
||||
packets.SeenList().save()
|
||||
LOG.info(stats.APRSDStats())
|
||||
|
@ -8,6 +8,7 @@ from aprslib.exceptions import LoginError
|
||||
from aprsd import config as aprsd_config
|
||||
from aprsd import exception
|
||||
from aprsd.clients import aprsis, kiss
|
||||
from aprsd.packets import core
|
||||
from aprsd.utils import trace
|
||||
|
||||
|
||||
@ -31,6 +32,7 @@ class Client:
|
||||
|
||||
connected = False
|
||||
server_string = None
|
||||
filter = None
|
||||
|
||||
def __new__(cls, *args, **kwargs):
|
||||
"""This magic turns this into a singleton."""
|
||||
@ -44,10 +46,17 @@ class Client:
|
||||
if config:
|
||||
self.config = config
|
||||
|
||||
def set_filter(self, filter):
|
||||
self.filter = filter
|
||||
if self._client:
|
||||
self._client.set_filter(filter)
|
||||
|
||||
@property
|
||||
def client(self):
|
||||
if not self._client:
|
||||
self._client = self.setup_connection()
|
||||
if self.filter:
|
||||
self._client.set_filter(self.filter)
|
||||
return self._client
|
||||
|
||||
def reset(self):
|
||||
@ -101,7 +110,7 @@ class APRSISClient(Client):
|
||||
|
||||
def decode_packet(self, *args, **kwargs):
|
||||
"""APRS lib already decodes this."""
|
||||
return args[0]
|
||||
return core.Packet.factory(args[0])
|
||||
|
||||
@trace.trace
|
||||
def setup_connection(self):
|
||||
@ -190,8 +199,8 @@ class KISSClient(Client):
|
||||
# msg = frame.tnc2
|
||||
LOG.debug(f"Decoding {msg}")
|
||||
|
||||
packet = aprslib.parse(msg)
|
||||
return packet
|
||||
raw = aprslib.parse(msg)
|
||||
return core.Packet.factory(raw)
|
||||
|
||||
@trace.trace
|
||||
def setup_connection(self):
|
||||
|
@ -1,6 +1,5 @@
|
||||
import logging
|
||||
import select
|
||||
import socket
|
||||
import threading
|
||||
|
||||
import aprslib
|
||||
@ -32,23 +31,6 @@ class Aprsdis(aprslib.IS):
|
||||
self.thread_stop = True
|
||||
LOG.info("Shutdown Aprsdis client.")
|
||||
|
||||
def is_socket_closed(self, sock: socket.socket) -> bool:
|
||||
try:
|
||||
# this will try to read bytes without blocking and also without removing them from buffer (peek only)
|
||||
data = sock.recv(16, socket.MSG_DONTWAIT | socket.MSG_PEEK)
|
||||
if len(data) == 0:
|
||||
return True
|
||||
except BlockingIOError:
|
||||
return False # socket is open and reading from it would block
|
||||
except ConnectionResetError:
|
||||
return True # socket was closed for some other reason
|
||||
except Exception:
|
||||
self.logger.exception(
|
||||
"unexpected exception when checking if a socket is closed",
|
||||
)
|
||||
return False
|
||||
return False
|
||||
|
||||
@wrapt.synchronized(lock)
|
||||
def send(self, msg):
|
||||
"""Send an APRS Message object."""
|
||||
|
@ -5,18 +5,18 @@
|
||||
# python included libs
|
||||
import datetime
|
||||
import logging
|
||||
import signal
|
||||
import sys
|
||||
import time
|
||||
|
||||
import aprslib
|
||||
import click
|
||||
from rich.console import Console
|
||||
|
||||
# local imports here
|
||||
import aprsd
|
||||
from aprsd import cli_helper, client, messaging, packets, stats, threads, utils
|
||||
from aprsd import cli_helper, client, packets, stats, threads, utils
|
||||
from aprsd.aprsd import cli
|
||||
from aprsd.utils import trace
|
||||
from aprsd.threads import rx
|
||||
|
||||
|
||||
# setup the global logger
|
||||
@ -37,6 +37,12 @@ def signal_handler(sig, frame):
|
||||
LOG.info(stats.APRSDStats())
|
||||
|
||||
|
||||
class APRSDListenThread(rx.APRSDRXThread):
|
||||
def process_packet(self, *args, **kwargs):
|
||||
packet = self._client.decode_packet(*args, **kwargs)
|
||||
packet.log(header="RX Packet")
|
||||
|
||||
|
||||
@cli.command()
|
||||
@cli_helper.add_options(cli_helper.common_options)
|
||||
@click.option(
|
||||
@ -74,6 +80,8 @@ def listen(
|
||||
o/obj1/obj2... - Object Filter Pass all objects with the exact name of obj1, obj2, ... (* wild card allowed)\n
|
||||
|
||||
"""
|
||||
signal.signal(signal.SIGINT, signal_handler)
|
||||
signal.signal(signal.SIGTERM, signal_handler)
|
||||
config = ctx.obj["config"]
|
||||
|
||||
if not aprs_login:
|
||||
@ -105,30 +113,10 @@ def listen(
|
||||
|
||||
# Try and load saved MsgTrack list
|
||||
LOG.debug("Loading saved MsgTrack object.")
|
||||
messaging.MsgTrack(config=config).load()
|
||||
packets.PacketTrack(config=config).load()
|
||||
packets.WatchList(config=config).load()
|
||||
packets.SeenList(config=config).load()
|
||||
|
||||
@trace.trace
|
||||
def rx_packet(packet):
|
||||
resp = packet.get("response", None)
|
||||
if resp == "ack":
|
||||
ack_num = packet.get("msgNo")
|
||||
console.log(f"We saw an ACK {ack_num} Ignoring")
|
||||
messaging.log_packet(packet)
|
||||
else:
|
||||
message = packet.get("message_text", None)
|
||||
fromcall = packet["from"]
|
||||
msg_number = packet.get("msgNo", "0")
|
||||
messaging.log_message(
|
||||
"Received Message",
|
||||
packet["raw"],
|
||||
message,
|
||||
fromcall=fromcall,
|
||||
ack=msg_number,
|
||||
console=console,
|
||||
)
|
||||
|
||||
# Initialize the client factory and create
|
||||
# The correct client object ready for use
|
||||
client.ClientFactory.setup(config)
|
||||
@ -140,29 +128,21 @@ def listen(
|
||||
# Creates the client object
|
||||
LOG.info("Creating client connection")
|
||||
aprs_client = client.factory.create()
|
||||
console.log(aprs_client)
|
||||
LOG.info(aprs_client)
|
||||
|
||||
LOG.debug(f"Filter by '{filter}'")
|
||||
aprs_client.client.set_filter(filter)
|
||||
aprs_client.set_filter(filter)
|
||||
|
||||
packets.PacketList(config=config)
|
||||
|
||||
keepalive = threads.KeepAliveThread(config=config)
|
||||
keepalive.start()
|
||||
|
||||
while True:
|
||||
try:
|
||||
# This will register a packet consumer with aprslib
|
||||
# When new packets come in the consumer will process
|
||||
# the packet
|
||||
# with console.status("Listening for packets"):
|
||||
aprs_client.client.consumer(rx_packet, raw=False)
|
||||
except aprslib.exceptions.ConnectionDrop:
|
||||
LOG.error("Connection dropped, reconnecting")
|
||||
time.sleep(5)
|
||||
# Force the deletion of the client object connected to aprs
|
||||
# This will cause a reconnect, next time client.get_client()
|
||||
# is called
|
||||
aprs_client.reset()
|
||||
except aprslib.exceptions.UnknownFormat:
|
||||
LOG.error("Got a Bad packet")
|
||||
LOG.debug("Create APRSDListenThread")
|
||||
listen_thread = APRSDListenThread(threads.msg_queues, config=config)
|
||||
LOG.debug("Start APRSDListenThread")
|
||||
listen_thread.start()
|
||||
LOG.debug("keepalive Join")
|
||||
keepalive.join()
|
||||
LOG.debug("listen_thread Join")
|
||||
listen_thread.join()
|
||||
|
@ -7,7 +7,7 @@ from aprslib.exceptions import LoginError
|
||||
import click
|
||||
|
||||
import aprsd
|
||||
from aprsd import cli_helper, client, messaging, packets
|
||||
from aprsd import cli_helper, client, packets
|
||||
from aprsd.aprsd import cli
|
||||
|
||||
|
||||
@ -98,32 +98,22 @@ def send_message(
|
||||
|
||||
def rx_packet(packet):
|
||||
global got_ack, got_response
|
||||
cl = client.factory.create()
|
||||
packet = cl.decode_packet(packet)
|
||||
packet.log("RX_PKT")
|
||||
# LOG.debug("Got packet back {}".format(packet))
|
||||
resp = packet.get("response", None)
|
||||
if resp == "ack":
|
||||
ack_num = packet.get("msgNo")
|
||||
LOG.info(f"We got ack for our sent message {ack_num}")
|
||||
messaging.log_packet(packet)
|
||||
if isinstance(packet, packets.AckPacket):
|
||||
got_ack = True
|
||||
else:
|
||||
message = packet.get("message_text", None)
|
||||
fromcall = packet["from"]
|
||||
msg_number = packet.get("msgNo", "0")
|
||||
messaging.log_message(
|
||||
"Received Message",
|
||||
packet["raw"],
|
||||
message,
|
||||
fromcall=fromcall,
|
||||
ack=msg_number,
|
||||
)
|
||||
got_response = True
|
||||
# Send the ack back?
|
||||
ack = messaging.AckMessage(
|
||||
config["aprs"]["login"],
|
||||
fromcall,
|
||||
msg_id=msg_number,
|
||||
from_call = packet.from_call
|
||||
our_call = config["aprsd"]["callsign"].lower()
|
||||
ack_pkt = packets.AckPacket(
|
||||
from_call=our_call,
|
||||
to_call=from_call,
|
||||
msgNo=packet.msgNo,
|
||||
)
|
||||
ack.send_direct()
|
||||
ack_pkt.send_direct()
|
||||
|
||||
if got_ack:
|
||||
if wait_response:
|
||||
@ -144,12 +134,16 @@ def send_message(
|
||||
# we should bail after we get the ack and send an ack back for the
|
||||
# message
|
||||
if raw:
|
||||
msg = messaging.RawMessage(raw)
|
||||
msg.send_direct()
|
||||
pkt = packets.Packet(from_call="", to_call="", raw=raw)
|
||||
pkt.send_direct()
|
||||
sys.exit(0)
|
||||
else:
|
||||
msg = messaging.TextMessage(aprs_login, tocallsign, command)
|
||||
msg.send_direct()
|
||||
pkt = packets.MessagePacket(
|
||||
from_call=aprs_login,
|
||||
to_call=tocallsign,
|
||||
message_text=command,
|
||||
)
|
||||
pkt.send_direct()
|
||||
|
||||
if no_ack:
|
||||
sys.exit(0)
|
||||
|
@ -6,8 +6,7 @@ import click
|
||||
|
||||
import aprsd
|
||||
from aprsd import (
|
||||
cli_helper, client, flask, messaging, packets, plugin, stats, threads,
|
||||
utils,
|
||||
cli_helper, client, flask, packets, plugin, stats, threads, utils,
|
||||
)
|
||||
from aprsd import aprsd as aprsd_main
|
||||
from aprsd.aprsd import cli
|
||||
@ -81,13 +80,15 @@ def server(ctx, flush):
|
||||
packets.PacketList(config=config)
|
||||
if flush:
|
||||
LOG.debug("Deleting saved MsgTrack.")
|
||||
messaging.MsgTrack(config=config).flush()
|
||||
#messaging.MsgTrack(config=config).flush()
|
||||
packets.PacketTrack(config=config).flush()
|
||||
packets.WatchList(config=config)
|
||||
packets.SeenList(config=config)
|
||||
else:
|
||||
# Try and load saved MsgTrack list
|
||||
LOG.debug("Loading saved MsgTrack object.")
|
||||
messaging.MsgTrack(config=config).load()
|
||||
#messaging.MsgTrack(config=config).load()
|
||||
packets.PacketTrack(config=config).load()
|
||||
packets.WatchList(config=config).load()
|
||||
packets.SeenList(config=config).load()
|
||||
|
||||
@ -102,7 +103,8 @@ def server(ctx, flush):
|
||||
)
|
||||
rx_thread.start()
|
||||
|
||||
messaging.MsgTrack().restart()
|
||||
#messaging.MsgTrack().restart()
|
||||
packets.PacketTrack().restart()
|
||||
|
||||
keepalive = threads.KeepAliveThread(config=config)
|
||||
keepalive.start()
|
||||
|
@ -22,7 +22,7 @@ import wrapt
|
||||
import aprsd
|
||||
from aprsd import cli_helper, client
|
||||
from aprsd import config as aprsd_config
|
||||
from aprsd import messaging, packets, stats, threads, utils
|
||||
from aprsd import packets, stats, threads, utils
|
||||
from aprsd.aprsd import cli
|
||||
from aprsd.logging import rich as aprsd_logging
|
||||
from aprsd.threads import rx
|
||||
@ -44,13 +44,11 @@ def signal_handler(sig, frame):
|
||||
threads.APRSDThreadList().stop_all()
|
||||
if "subprocess" not in str(frame):
|
||||
time.sleep(1.5)
|
||||
# messaging.MsgTrack().save()
|
||||
# packets.WatchList().save()
|
||||
# packets.SeenList().save()
|
||||
LOG.info(stats.APRSDStats())
|
||||
LOG.info("Telling flask to bail.")
|
||||
signal.signal(signal.SIGTERM, sys.exit(0))
|
||||
sys.exit(0)
|
||||
|
||||
|
||||
class SentMessages(objectstore.ObjectStoreMixin):
|
||||
@ -67,11 +65,11 @@ class SentMessages(objectstore.ObjectStoreMixin):
|
||||
|
||||
@wrapt.synchronized(lock)
|
||||
def add(self, msg):
|
||||
self.data[msg.id] = self.create(msg.id)
|
||||
self.data[msg.id]["from"] = msg.fromcall
|
||||
self.data[msg.id]["to"] = msg.tocall
|
||||
self.data[msg.id]["message"] = msg.message.rstrip("\n")
|
||||
self.data[msg.id]["raw"] = str(msg).rstrip("\n")
|
||||
self.data[msg.msgNo] = self.create(msg.msgNo)
|
||||
self.data[msg.msgNo]["from"] = msg.from_call
|
||||
self.data[msg.msgNo]["to"] = msg.to_call
|
||||
self.data[msg.msgNo]["message"] = msg.message_text.rstrip("\n")
|
||||
self.data[msg.msgNo]["raw"] = msg.message_text.rstrip("\n")
|
||||
|
||||
def create(self, id):
|
||||
return {
|
||||
@ -168,7 +166,7 @@ class WebChatProcessPacketThread(rx.APRSDProcessPacketThread):
|
||||
self.connected = False
|
||||
super().__init__(config, packet)
|
||||
|
||||
def process_ack_packet(self, packet):
|
||||
def process_ack_packet(self, packet: packets.AckPacket):
|
||||
super().process_ack_packet(packet)
|
||||
ack_num = packet.get("msgNo")
|
||||
SentMessages().ack(int(ack_num))
|
||||
@ -178,21 +176,21 @@ class WebChatProcessPacketThread(rx.APRSDProcessPacketThread):
|
||||
)
|
||||
self.got_ack = True
|
||||
|
||||
def process_non_ack_packet(self, packet):
|
||||
def process_our_message_packet(self, packet: packets.MessagePacket):
|
||||
LOG.info(f"process non ack PACKET {packet}")
|
||||
packet.get("addresse", None)
|
||||
fromcall = packet["from"]
|
||||
fromcall = packet.from_call
|
||||
|
||||
packets.PacketList().add(packet)
|
||||
stats.APRSDStats().msgs_rx_inc()
|
||||
message = packet.get("message_text", None)
|
||||
msg = {
|
||||
"id": 0,
|
||||
"ts": time.time(),
|
||||
"ts": packet.get("timestamp", time.time()),
|
||||
"ack": False,
|
||||
"from": fromcall,
|
||||
"to": packet["to"],
|
||||
"raw": packet["raw"],
|
||||
"to": packet.to_call,
|
||||
"raw": packet.raw,
|
||||
"message": message,
|
||||
"status": None,
|
||||
"last_update": None,
|
||||
@ -344,21 +342,21 @@ class SendMessageNamespace(Namespace):
|
||||
LOG.debug(f"WS: on_send {data}")
|
||||
self.request = data
|
||||
data["from"] = self._config["aprs"]["login"]
|
||||
msg = messaging.TextMessage(
|
||||
data["from"],
|
||||
data["to"].upper(),
|
||||
data["message"],
|
||||
pkt = packets.MessagePacket(
|
||||
from_call=data["from"],
|
||||
to_call=data["to"].upper(),
|
||||
message_text=data["message"],
|
||||
)
|
||||
self.msg = msg
|
||||
self.msg = pkt
|
||||
msgs = SentMessages()
|
||||
msgs.add(msg)
|
||||
msgs.set_status(msg.id, "Sending")
|
||||
obj = msgs.get(self.msg.id)
|
||||
msgs.add(pkt)
|
||||
pkt.send()
|
||||
msgs.set_status(pkt.msgNo, "Sending")
|
||||
obj = msgs.get(pkt.msgNo)
|
||||
socketio.emit(
|
||||
"sent", obj,
|
||||
namespace="/sendmsg",
|
||||
)
|
||||
msg.send()
|
||||
|
||||
def on_gps(self, data):
|
||||
LOG.debug(f"WS on_GPS: {data}")
|
||||
@ -381,10 +379,16 @@ class SendMessageNamespace(Namespace):
|
||||
txt = f"@{time_zulu}z{lat}1{long}$APRSD WebChat Beacon"
|
||||
|
||||
LOG.debug(f"Sending {txt}")
|
||||
beacon_msg = messaging.RawMessage(txt)
|
||||
beacon_msg.fromcall = self._config["aprs"]["login"]
|
||||
beacon_msg.tocall = "APDW16"
|
||||
beacon_msg.send_direct()
|
||||
beacon = packets.GPSPacket(
|
||||
from_call=self._config["aprs"]["login"],
|
||||
to_call="APDW16",
|
||||
raw=txt,
|
||||
)
|
||||
beacon.send_direct()
|
||||
#beacon_msg = messaging.RawMessage(txt)
|
||||
#beacon_msg.fromcall = self._config["aprs"]["login"]
|
||||
#beacon_msg.tocall = "APDW16"
|
||||
#beacon_msg.send_direct()
|
||||
|
||||
def handle_message(self, data):
|
||||
LOG.debug(f"WS Data {data}")
|
||||
@ -537,7 +541,7 @@ def webchat(ctx, flush, port):
|
||||
sys.exit(-1)
|
||||
|
||||
packets.PacketList(config=config)
|
||||
messaging.MsgTrack(config=config)
|
||||
packets.PacketTrack(config=config)
|
||||
packets.WatchList(config=config)
|
||||
packets.SeenList(config=config)
|
||||
|
||||
|
@ -1,584 +1,3 @@
|
||||
import abc
|
||||
import datetime
|
||||
import logging
|
||||
from multiprocessing import RawValue
|
||||
import re
|
||||
import threading
|
||||
import time
|
||||
|
||||
from aprsd import client, packets, stats, threads
|
||||
from aprsd.utils import objectstore
|
||||
|
||||
|
||||
LOG = logging.getLogger("APRSD")
|
||||
|
||||
# What to return from a plugin if we have processed the message
|
||||
# and it's ok, but don't send a usage string back
|
||||
NULL_MESSAGE = -1
|
||||
|
||||
|
||||
class MsgTrack(objectstore.ObjectStoreMixin):
|
||||
"""Class to keep track of outstanding text messages.
|
||||
|
||||
This is a thread safe class that keeps track of active
|
||||
messages.
|
||||
|
||||
When a message is asked to be sent, it is placed into this
|
||||
class via it's id. The TextMessage class's send() method
|
||||
automatically adds itself to this class. When the ack is
|
||||
recieved from the radio, the message object is removed from
|
||||
this class.
|
||||
"""
|
||||
|
||||
_instance = None
|
||||
_start_time = None
|
||||
lock = None
|
||||
|
||||
data = {}
|
||||
total_messages_tracked = 0
|
||||
|
||||
def __new__(cls, *args, **kwargs):
|
||||
if cls._instance is None:
|
||||
cls._instance = super().__new__(cls)
|
||||
cls._instance.track = {}
|
||||
cls._instance._start_time = datetime.datetime.now()
|
||||
cls._instance.lock = threading.Lock()
|
||||
cls._instance.config = kwargs["config"]
|
||||
cls._instance._init_store()
|
||||
return cls._instance
|
||||
|
||||
def __getitem__(self, name):
|
||||
with self.lock:
|
||||
return self.data[name]
|
||||
|
||||
def __iter__(self):
|
||||
with self.lock:
|
||||
return iter(self.data)
|
||||
|
||||
def keys(self):
|
||||
with self.lock:
|
||||
return self.data.keys()
|
||||
|
||||
def items(self):
|
||||
with self.lock:
|
||||
return self.data.items()
|
||||
|
||||
def values(self):
|
||||
with self.lock:
|
||||
return self.data.values()
|
||||
|
||||
def __len__(self):
|
||||
with self.lock:
|
||||
return len(self.data)
|
||||
|
||||
def __str__(self):
|
||||
with self.lock:
|
||||
result = "{"
|
||||
for key in self.data.keys():
|
||||
result += f"{key}: {str(self.data[key])}, "
|
||||
result += "}"
|
||||
return result
|
||||
|
||||
def add(self, msg):
|
||||
with self.lock:
|
||||
key = int(msg.id)
|
||||
self.data[key] = msg
|
||||
stats.APRSDStats().msgs_tracked_inc()
|
||||
self.total_messages_tracked += 1
|
||||
|
||||
def get(self, id):
|
||||
with self.lock:
|
||||
if id in self.data:
|
||||
return self.data[id]
|
||||
|
||||
def remove(self, id):
|
||||
with self.lock:
|
||||
key = int(id)
|
||||
if key in self.data.keys():
|
||||
del self.data[key]
|
||||
|
||||
def restart(self):
|
||||
"""Walk the list of messages and restart them if any."""
|
||||
|
||||
for key in self.data.keys():
|
||||
msg = self.data[key]
|
||||
if msg.last_send_attempt < msg.retry_count:
|
||||
msg.send()
|
||||
|
||||
def _resend(self, msg):
|
||||
msg.last_send_attempt = 0
|
||||
msg.send()
|
||||
|
||||
def restart_delayed(self, count=None, most_recent=True):
|
||||
"""Walk the list of delayed messages and restart them if any."""
|
||||
if not count:
|
||||
# Send all the delayed messages
|
||||
for key in self.data.keys():
|
||||
msg = self.data[key]
|
||||
if msg.last_send_attempt == msg.retry_count:
|
||||
self._resend(msg)
|
||||
else:
|
||||
# They want to resend <count> delayed messages
|
||||
tmp = sorted(
|
||||
self.data.items(),
|
||||
reverse=most_recent,
|
||||
key=lambda x: x[1].last_send_time,
|
||||
)
|
||||
msg_list = tmp[:count]
|
||||
for (_key, msg) in msg_list:
|
||||
self._resend(msg)
|
||||
|
||||
|
||||
class MessageCounter:
|
||||
"""
|
||||
Global message id counter class.
|
||||
|
||||
This is a singleton based class that keeps
|
||||
an incrementing counter for all messages to
|
||||
be sent. All new Message objects gets a new
|
||||
message id, which is the next number available
|
||||
from the MessageCounter.
|
||||
|
||||
"""
|
||||
|
||||
_instance = None
|
||||
max_count = 9999
|
||||
lock = None
|
||||
|
||||
def __new__(cls, *args, **kwargs):
|
||||
"""Make this a singleton class."""
|
||||
if cls._instance is None:
|
||||
cls._instance = super().__new__(cls, *args, **kwargs)
|
||||
cls._instance.val = RawValue("i", 1)
|
||||
cls._instance.lock = threading.Lock()
|
||||
return cls._instance
|
||||
|
||||
def increment(self):
|
||||
with self.lock:
|
||||
if self.val.value == self.max_count:
|
||||
self.val.value = 1
|
||||
else:
|
||||
self.val.value += 1
|
||||
|
||||
@property
|
||||
def value(self):
|
||||
with self.lock:
|
||||
return self.val.value
|
||||
|
||||
def __repr__(self):
|
||||
with self.lock:
|
||||
return str(self.val.value)
|
||||
|
||||
def __str__(self):
|
||||
with self.lock:
|
||||
return str(self.val.value)
|
||||
|
||||
|
||||
class Message(metaclass=abc.ABCMeta):
|
||||
"""Base Message Class."""
|
||||
|
||||
# The message id to send over the air
|
||||
id = 0
|
||||
|
||||
retry_count = 3
|
||||
last_send_time = 0
|
||||
last_send_attempt = 0
|
||||
|
||||
transport = None
|
||||
_raw_message = None
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
fromcall,
|
||||
tocall,
|
||||
msg_id=None,
|
||||
allow_delay=True,
|
||||
):
|
||||
self.fromcall = fromcall
|
||||
self.tocall = tocall
|
||||
if not msg_id:
|
||||
c = MessageCounter()
|
||||
c.increment()
|
||||
msg_id = c.value
|
||||
self.id = msg_id
|
||||
|
||||
# do we try and save this message for later if we don't get
|
||||
# an ack? Some messages we don't want to do this ever.
|
||||
self.allow_delay = allow_delay
|
||||
|
||||
@abc.abstractmethod
|
||||
def send(self):
|
||||
"""Child class must declare."""
|
||||
|
||||
def _filter_for_send(self):
|
||||
"""Filter and format message string for FCC."""
|
||||
# max? ftm400 displays 64, raw msg shows 74
|
||||
# and ftm400-send is max 64. setting this to
|
||||
# 67 displays 64 on the ftm400. (+3 {01 suffix)
|
||||
# feature req: break long ones into two msgs
|
||||
message = self._raw_message[:67]
|
||||
# We all miss George Carlin
|
||||
return re.sub("fuck|shit|cunt|piss|cock|bitch", "****", message)
|
||||
|
||||
@property
|
||||
def message(self):
|
||||
return self._filter_for_send().rstrip("\n")
|
||||
|
||||
def __str__(self):
|
||||
return self.message
|
||||
|
||||
|
||||
class RawMessage(Message):
|
||||
"""Send a raw message.
|
||||
|
||||
This class is used for custom messages that contain the entire
|
||||
contents of an APRS message in the message field.
|
||||
|
||||
"""
|
||||
|
||||
last_send_age = last_send_time = None
|
||||
|
||||
def __init__(self, message, allow_delay=True):
|
||||
super().__init__(
|
||||
fromcall=None, tocall=None, msg_id=None,
|
||||
allow_delay=allow_delay,
|
||||
)
|
||||
self._raw_message = message
|
||||
|
||||
def dict(self):
|
||||
now = datetime.datetime.now()
|
||||
last_send_age = None
|
||||
if self.last_send_time:
|
||||
last_send_age = str(now - self.last_send_time)
|
||||
return {
|
||||
"type": "raw",
|
||||
"message": self.message,
|
||||
"raw": str(self),
|
||||
"retry_count": self.retry_count,
|
||||
"last_send_attempt": self.last_send_attempt,
|
||||
"last_send_time": str(self.last_send_time),
|
||||
"last_send_age": last_send_age,
|
||||
}
|
||||
|
||||
def send(self):
|
||||
tracker = MsgTrack()
|
||||
tracker.add(self)
|
||||
thread = SendMessageThread(message=self)
|
||||
thread.start()
|
||||
|
||||
def send_direct(self, aprsis_client=None):
|
||||
"""Send a message without a separate thread."""
|
||||
cl = client.factory.create().client
|
||||
log_message(
|
||||
"Sending Message Direct",
|
||||
str(self),
|
||||
self.message,
|
||||
tocall=self.tocall,
|
||||
fromcall=self.fromcall,
|
||||
)
|
||||
cl.send(self)
|
||||
stats.APRSDStats().msgs_tx_inc()
|
||||
|
||||
|
||||
class TextMessage(Message):
|
||||
"""Send regular ARPS text/command messages/replies."""
|
||||
|
||||
last_send_time = last_send_age = None
|
||||
|
||||
def __init__(
|
||||
self, fromcall, tocall, message,
|
||||
msg_id=None, allow_delay=True,
|
||||
):
|
||||
super().__init__(
|
||||
fromcall=fromcall, tocall=tocall,
|
||||
msg_id=msg_id, allow_delay=allow_delay,
|
||||
)
|
||||
self._raw_message = message
|
||||
|
||||
def dict(self):
|
||||
now = datetime.datetime.now()
|
||||
|
||||
last_send_age = None
|
||||
if self.last_send_time:
|
||||
last_send_age = str(now - self.last_send_time)
|
||||
|
||||
return {
|
||||
"id": self.id,
|
||||
"type": "text-message",
|
||||
"fromcall": self.fromcall,
|
||||
"tocall": self.tocall,
|
||||
"message": self.message,
|
||||
"raw": str(self),
|
||||
"retry_count": self.retry_count,
|
||||
"last_send_attempt": self.last_send_attempt,
|
||||
"last_send_time": str(self.last_send_time),
|
||||
"last_send_age": last_send_age,
|
||||
}
|
||||
|
||||
def __str__(self):
|
||||
"""Build raw string to send over the air."""
|
||||
return "{}>APZ100::{}:{}{{{}\n".format(
|
||||
self.fromcall,
|
||||
self.tocall.ljust(9),
|
||||
self.message,
|
||||
str(self.id),
|
||||
)
|
||||
|
||||
def send(self):
|
||||
tracker = MsgTrack()
|
||||
tracker.add(self)
|
||||
LOG.debug(f"Length of MsgTrack is {len(tracker)}")
|
||||
thread = SendMessageThread(message=self)
|
||||
thread.start()
|
||||
|
||||
def send_direct(self, aprsis_client=None):
|
||||
"""Send a message without a separate thread."""
|
||||
if aprsis_client:
|
||||
cl = aprsis_client
|
||||
else:
|
||||
cl = client.factory.create().client
|
||||
log_message(
|
||||
"Sending Message Direct",
|
||||
str(self),
|
||||
self.message,
|
||||
tocall=self.tocall,
|
||||
fromcall=self.fromcall,
|
||||
)
|
||||
cl.send(self)
|
||||
stats.APRSDStats().msgs_tx_inc()
|
||||
packets.PacketList().add(self.dict())
|
||||
|
||||
|
||||
class SendMessageThread(threads.APRSDThread):
|
||||
def __init__(self, message):
|
||||
self.msg = message
|
||||
name = self.msg._raw_message[:5]
|
||||
super().__init__(f"TXPKT-{self.msg.id}-{name}")
|
||||
|
||||
def loop(self):
|
||||
"""Loop until a message is acked or it gets delayed.
|
||||
|
||||
We only sleep for 5 seconds between each loop run, so
|
||||
that CTRL-C can exit the app in a short period. Each sleep
|
||||
means the app quitting is blocked until sleep is done.
|
||||
So we keep track of the last send attempt and only send if the
|
||||
last send attempt is old enough.
|
||||
|
||||
"""
|
||||
tracker = MsgTrack()
|
||||
# lets see if the message is still in the tracking queue
|
||||
msg = tracker.get(self.msg.id)
|
||||
if not msg:
|
||||
# The message has been removed from the tracking queue
|
||||
# So it got acked and we are done.
|
||||
LOG.info("Message Send Complete via Ack.")
|
||||
return False
|
||||
else:
|
||||
send_now = False
|
||||
if msg.last_send_attempt == msg.retry_count:
|
||||
# we reached the send limit, don't send again
|
||||
# TODO(hemna) - Need to put this in a delayed queue?
|
||||
LOG.info("Message Send Complete. Max attempts reached.")
|
||||
if not msg.allow_delay:
|
||||
tracker.remove(msg.id)
|
||||
return False
|
||||
|
||||
# Message is still outstanding and needs to be acked.
|
||||
if msg.last_send_time:
|
||||
# Message has a last send time tracking
|
||||
now = datetime.datetime.now()
|
||||
sleeptime = (msg.last_send_attempt + 1) * 31
|
||||
delta = now - msg.last_send_time
|
||||
if delta > datetime.timedelta(seconds=sleeptime):
|
||||
# It's time to try to send it again
|
||||
send_now = True
|
||||
else:
|
||||
send_now = True
|
||||
|
||||
if send_now:
|
||||
# no attempt time, so lets send it, and start
|
||||
# tracking the time.
|
||||
log_message(
|
||||
"Sending Message",
|
||||
str(msg),
|
||||
msg.message,
|
||||
tocall=self.msg.tocall,
|
||||
retry_number=msg.last_send_attempt,
|
||||
msg_num=msg.id,
|
||||
)
|
||||
cl = client.factory.create().client
|
||||
cl.send(msg)
|
||||
stats.APRSDStats().msgs_tx_inc()
|
||||
packets.PacketList().add(msg.dict())
|
||||
msg.last_send_time = datetime.datetime.now()
|
||||
msg.last_send_attempt += 1
|
||||
|
||||
time.sleep(5)
|
||||
# Make sure we get called again.
|
||||
return True
|
||||
|
||||
|
||||
class AckMessage(Message):
|
||||
"""Class for building Acks and sending them."""
|
||||
|
||||
def __init__(self, fromcall, tocall, msg_id):
|
||||
super().__init__(fromcall, tocall, msg_id=msg_id)
|
||||
|
||||
def dict(self):
|
||||
now = datetime.datetime.now()
|
||||
last_send_age = None
|
||||
if self.last_send_time:
|
||||
last_send_age = str(now - self.last_send_time)
|
||||
return {
|
||||
"id": self.id,
|
||||
"type": "ack",
|
||||
"fromcall": self.fromcall,
|
||||
"tocall": self.tocall,
|
||||
"raw": str(self).rstrip("\n"),
|
||||
"retry_count": self.retry_count,
|
||||
"last_send_attempt": self.last_send_attempt,
|
||||
"last_send_time": str(self.last_send_time),
|
||||
"last_send_age": last_send_age,
|
||||
}
|
||||
|
||||
def __str__(self):
|
||||
return "{}>APZ100::{}:ack{}\n".format(
|
||||
self.fromcall,
|
||||
self.tocall.ljust(9),
|
||||
self.id,
|
||||
)
|
||||
|
||||
def _filter_for_send(self):
|
||||
return f"ack{self.id}"
|
||||
|
||||
def send(self):
|
||||
LOG.debug(f"Send ACK({self.tocall}:{self.id}) to radio.")
|
||||
thread = SendAckThread(self)
|
||||
thread.start()
|
||||
|
||||
def send_direct(self, aprsis_client=None):
|
||||
"""Send an ack message without a separate thread."""
|
||||
if aprsis_client:
|
||||
cl = aprsis_client
|
||||
else:
|
||||
cl = client.factory.create().client
|
||||
log_message(
|
||||
"Sending ack",
|
||||
str(self).rstrip("\n"),
|
||||
None,
|
||||
ack=self.id,
|
||||
tocall=self.tocall,
|
||||
fromcall=self.fromcall,
|
||||
)
|
||||
cl.send(self)
|
||||
|
||||
|
||||
class SendAckThread(threads.APRSDThread):
|
||||
def __init__(self, ack):
|
||||
self.ack = ack
|
||||
super().__init__(f"SendAck-{self.ack.id}")
|
||||
|
||||
def loop(self):
|
||||
"""Separate thread to send acks with retries."""
|
||||
send_now = False
|
||||
if self.ack.last_send_attempt == self.ack.retry_count:
|
||||
# we reached the send limit, don't send again
|
||||
# TODO(hemna) - Need to put this in a delayed queue?
|
||||
LOG.info("Ack Send Complete. Max attempts reached.")
|
||||
return False
|
||||
|
||||
if self.ack.last_send_time:
|
||||
# Message has a last send time tracking
|
||||
now = datetime.datetime.now()
|
||||
|
||||
# aprs duplicate detection is 30 secs?
|
||||
# (21 only sends first, 28 skips middle)
|
||||
sleeptime = 31
|
||||
delta = now - self.ack.last_send_time
|
||||
if delta > datetime.timedelta(seconds=sleeptime):
|
||||
# It's time to try to send it again
|
||||
send_now = True
|
||||
else:
|
||||
LOG.debug(f"Still wating. {delta}")
|
||||
else:
|
||||
send_now = True
|
||||
|
||||
if send_now:
|
||||
cl = client.factory.create().client
|
||||
log_message(
|
||||
"Sending ack",
|
||||
str(self.ack).rstrip("\n"),
|
||||
None,
|
||||
ack=self.ack.id,
|
||||
tocall=self.ack.tocall,
|
||||
retry_number=self.ack.last_send_attempt,
|
||||
)
|
||||
cl.send(self.ack)
|
||||
stats.APRSDStats().ack_tx_inc()
|
||||
packets.PacketList().add(self.ack.dict())
|
||||
self.ack.last_send_attempt += 1
|
||||
self.ack.last_send_time = datetime.datetime.now()
|
||||
time.sleep(5)
|
||||
return True
|
||||
|
||||
|
||||
def log_packet(packet):
|
||||
fromcall = packet.get("from", None)
|
||||
tocall = packet.get("to", None)
|
||||
|
||||
response_type = packet.get("response", None)
|
||||
msg = packet.get("message_text", None)
|
||||
msg_num = packet.get("msgNo", None)
|
||||
ack = packet.get("ack", None)
|
||||
|
||||
log_message(
|
||||
"Packet", packet["raw"], msg, fromcall=fromcall, tocall=tocall,
|
||||
ack=ack, packet_type=response_type, msg_num=msg_num, )
|
||||
|
||||
|
||||
def log_message(
|
||||
header, raw, message, tocall=None, fromcall=None, msg_num=None,
|
||||
retry_number=None, ack=None, packet_type=None, uuid=None,
|
||||
console=None,
|
||||
):
|
||||
"""
|
||||
|
||||
Log a message entry.
|
||||
|
||||
This builds a long string with newlines for the log entry, so that
|
||||
it's thread safe. If we log each item as a separate log.debug() call
|
||||
Then the message information could get multiplexed with other log
|
||||
messages. Each python log call is automatically synchronized.
|
||||
|
||||
|
||||
"""
|
||||
|
||||
log_list = [""]
|
||||
if retry_number:
|
||||
log_list.append(f"{header} _______________(TX:{retry_number})")
|
||||
else:
|
||||
log_list.append(f"{header} _______________")
|
||||
|
||||
log_list.append(f" Raw : {raw}")
|
||||
|
||||
if packet_type:
|
||||
log_list.append(f" Packet : {packet_type}")
|
||||
if tocall:
|
||||
log_list.append(f" To : {tocall}")
|
||||
if fromcall:
|
||||
log_list.append(f" From : {fromcall}")
|
||||
|
||||
if ack:
|
||||
log_list.append(f" Ack : {ack}")
|
||||
else:
|
||||
log_list.append(f" Message : {message}")
|
||||
if msg_num:
|
||||
log_list.append(f" Msg # : {msg_num}")
|
||||
if uuid:
|
||||
log_list.append(f" UUID : {uuid}")
|
||||
log_list.append(f"{header} _______________ Complete")
|
||||
|
||||
if console:
|
||||
console.log("\n".join(log_list))
|
||||
else:
|
||||
LOG.info("\n".join(log_list))
|
||||
|
221
aprsd/packets.py
221
aprsd/packets.py
@ -1,221 +0,0 @@
|
||||
import datetime
|
||||
import logging
|
||||
import threading
|
||||
import time
|
||||
|
||||
import wrapt
|
||||
|
||||
from aprsd import utils
|
||||
from aprsd.utils import objectstore
|
||||
|
||||
|
||||
LOG = logging.getLogger("APRSD")
|
||||
|
||||
PACKET_TYPE_MESSAGE = "message"
|
||||
PACKET_TYPE_ACK = "ack"
|
||||
PACKET_TYPE_MICE = "mic-e"
|
||||
|
||||
|
||||
class PacketList:
|
||||
"""Class to track all of the packets rx'd and tx'd by aprsd."""
|
||||
|
||||
_instance = None
|
||||
lock = threading.Lock()
|
||||
config = None
|
||||
|
||||
packet_list = {}
|
||||
|
||||
total_recv = 0
|
||||
total_tx = 0
|
||||
|
||||
def __new__(cls, *args, **kwargs):
|
||||
if cls._instance is None:
|
||||
cls._instance = super().__new__(cls)
|
||||
cls._instance.packet_list = utils.RingBuffer(1000)
|
||||
cls._instance.config = kwargs["config"]
|
||||
return cls._instance
|
||||
|
||||
def __init__(self, config=None):
|
||||
if config:
|
||||
self.config = config
|
||||
|
||||
@wrapt.synchronized(lock)
|
||||
def __iter__(self):
|
||||
return iter(self.packet_list)
|
||||
|
||||
@wrapt.synchronized(lock)
|
||||
def add(self, packet):
|
||||
packet["ts"] = time.time()
|
||||
if (
|
||||
"fromcall" in packet
|
||||
and packet["fromcall"] == self.config["aprs"]["login"]
|
||||
):
|
||||
self.total_tx += 1
|
||||
else:
|
||||
self.total_recv += 1
|
||||
self.packet_list.append(packet)
|
||||
SeenList().update_seen(packet)
|
||||
|
||||
@wrapt.synchronized(lock)
|
||||
def get(self):
|
||||
return self.packet_list.get()
|
||||
|
||||
@wrapt.synchronized(lock)
|
||||
def total_received(self):
|
||||
return self.total_recv
|
||||
|
||||
@wrapt.synchronized(lock)
|
||||
def total_sent(self):
|
||||
return self.total_tx
|
||||
|
||||
|
||||
class WatchList(objectstore.ObjectStoreMixin):
|
||||
"""Global watch list and info for callsigns."""
|
||||
|
||||
_instance = None
|
||||
lock = threading.Lock()
|
||||
data = {}
|
||||
config = None
|
||||
|
||||
def __new__(cls, *args, **kwargs):
|
||||
if cls._instance is None:
|
||||
cls._instance = super().__new__(cls)
|
||||
if "config" in kwargs:
|
||||
cls._instance.config = kwargs["config"]
|
||||
cls._instance._init_store()
|
||||
cls._instance.data = {}
|
||||
return cls._instance
|
||||
|
||||
def __init__(self, config=None):
|
||||
if config:
|
||||
self.config = config
|
||||
|
||||
ring_size = config["aprsd"]["watch_list"].get("packet_keep_count", 10)
|
||||
|
||||
for callsign in config["aprsd"]["watch_list"].get("callsigns", []):
|
||||
call = callsign.replace("*", "")
|
||||
# FIXME(waboring) - we should fetch the last time we saw
|
||||
# a beacon from a callsign or some other mechanism to find
|
||||
# last time a message was seen by aprs-is. For now this
|
||||
# is all we can do.
|
||||
self.data[call] = {
|
||||
"last": datetime.datetime.now(),
|
||||
"packets": utils.RingBuffer(
|
||||
ring_size,
|
||||
),
|
||||
}
|
||||
|
||||
def is_enabled(self):
|
||||
if self.config and "watch_list" in self.config["aprsd"]:
|
||||
return self.config["aprsd"]["watch_list"].get("enabled", False)
|
||||
else:
|
||||
return False
|
||||
|
||||
def callsign_in_watchlist(self, callsign):
|
||||
return callsign in self.data
|
||||
|
||||
@wrapt.synchronized(lock)
|
||||
def update_seen(self, packet):
|
||||
callsign = packet["from"]
|
||||
if self.callsign_in_watchlist(callsign):
|
||||
self.data[callsign]["last"] = datetime.datetime.now()
|
||||
self.data[callsign]["packets"].append(packet)
|
||||
|
||||
def last_seen(self, callsign):
|
||||
if self.callsign_in_watchlist(callsign):
|
||||
return self.data[callsign]["last"]
|
||||
|
||||
def age(self, callsign):
|
||||
now = datetime.datetime.now()
|
||||
return str(now - self.last_seen(callsign))
|
||||
|
||||
def max_delta(self, seconds=None):
|
||||
watch_list_conf = self.config["aprsd"]["watch_list"]
|
||||
if not seconds:
|
||||
seconds = watch_list_conf["alert_time_seconds"]
|
||||
max_timeout = {"seconds": seconds}
|
||||
return datetime.timedelta(**max_timeout)
|
||||
|
||||
def is_old(self, callsign, seconds=None):
|
||||
"""Watch list callsign last seen is old compared to now?
|
||||
|
||||
This tests to see if the last time we saw a callsign packet,
|
||||
if that is older than the allowed timeout in the config.
|
||||
|
||||
We put this here so any notification plugin can use this
|
||||
same test.
|
||||
"""
|
||||
age = self.age(callsign)
|
||||
|
||||
delta = utils.parse_delta_str(age)
|
||||
d = datetime.timedelta(**delta)
|
||||
|
||||
max_delta = self.max_delta(seconds=seconds)
|
||||
|
||||
if d > max_delta:
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
|
||||
class SeenList(objectstore.ObjectStoreMixin):
|
||||
"""Global callsign seen list."""
|
||||
|
||||
_instance = None
|
||||
lock = threading.Lock()
|
||||
data = {}
|
||||
config = None
|
||||
|
||||
def __new__(cls, *args, **kwargs):
|
||||
if cls._instance is None:
|
||||
cls._instance = super().__new__(cls)
|
||||
if "config" in kwargs:
|
||||
cls._instance.config = kwargs["config"]
|
||||
cls._instance._init_store()
|
||||
cls._instance.data = {}
|
||||
return cls._instance
|
||||
|
||||
@wrapt.synchronized(lock)
|
||||
def update_seen(self, packet):
|
||||
callsign = None
|
||||
if "fromcall" in packet:
|
||||
callsign = packet["fromcall"]
|
||||
elif "from" in packet:
|
||||
callsign = packet["from"]
|
||||
else:
|
||||
LOG.warning(f"Can't find FROM in packet {packet}")
|
||||
return
|
||||
if callsign not in self.data:
|
||||
self.data[callsign] = {
|
||||
"last": None,
|
||||
"count": 0,
|
||||
}
|
||||
self.data[callsign]["last"] = str(datetime.datetime.now())
|
||||
self.data[callsign]["count"] += 1
|
||||
|
||||
|
||||
def get_packet_type(packet):
|
||||
"""Decode the packet type from the packet."""
|
||||
|
||||
msg_format = packet.get("format", None)
|
||||
msg_response = packet.get("response", None)
|
||||
packet_type = "unknown"
|
||||
if msg_format == "message":
|
||||
packet_type = PACKET_TYPE_MESSAGE
|
||||
elif msg_response == "ack":
|
||||
packet_type = PACKET_TYPE_ACK
|
||||
elif msg_format == "mic-e":
|
||||
packet_type = PACKET_TYPE_MICE
|
||||
return packet_type
|
||||
|
||||
|
||||
def is_message_packet(packet):
|
||||
return get_packet_type(packet) == PACKET_TYPE_MESSAGE
|
||||
|
||||
|
||||
def is_ack_packet(packet):
|
||||
return get_packet_type(packet) == PACKET_TYPE_ACK
|
||||
|
||||
|
||||
def is_mice_packet(packet):
|
||||
return get_packet_type(packet) == PACKET_TYPE_MICE
|
8
aprsd/packets/__init__.py
Normal file
8
aprsd/packets/__init__.py
Normal file
@ -0,0 +1,8 @@
|
||||
from aprsd.packets.core import (
|
||||
AckPacket, GPSPacket, MessagePacket, MicEPacket, Packet, PathPacket,
|
||||
StatusPacket, WeatherPacket,
|
||||
)
|
||||
from aprsd.packets.packet_list import PacketList
|
||||
from aprsd.packets.seen_list import SeenList
|
||||
from aprsd.packets.tracker import PacketTrack
|
||||
from aprsd.packets.watch_list import WatchList
|
320
aprsd/packets/core.py
Normal file
320
aprsd/packets/core.py
Normal file
@ -0,0 +1,320 @@
|
||||
import abc
|
||||
from dataclasses import asdict, dataclass, field
|
||||
import logging
|
||||
import re
|
||||
import time
|
||||
# Due to a failure in python 3.8
|
||||
from typing import List
|
||||
|
||||
import dacite
|
||||
|
||||
from aprsd import client, stats
|
||||
from aprsd.threads import tx
|
||||
from aprsd.utils import counter
|
||||
|
||||
|
||||
LOG = logging.getLogger("APRSD")
|
||||
|
||||
PACKET_TYPE_MESSAGE = "message"
|
||||
PACKET_TYPE_ACK = "ack"
|
||||
PACKET_TYPE_MICE = "mic-e"
|
||||
PACKET_TYPE_WX = "weather"
|
||||
PACKET_TYPE_UNKNOWN = "unknown"
|
||||
PACKET_TYPE_STATUS = "status"
|
||||
PACKET_TYPE_BEACON = "beacon"
|
||||
PACKET_TYPE_UNCOMPRESSED = "uncompressed"
|
||||
|
||||
|
||||
@dataclass()
|
||||
class Packet(metaclass=abc.ABCMeta):
|
||||
from_call: str
|
||||
to_call: str
|
||||
addresse: str = None
|
||||
format: str = None
|
||||
msgNo: str = None # noqa: N815
|
||||
packet_type: str = None
|
||||
timestamp: float = field(default_factory=time.time)
|
||||
raw: str = None
|
||||
_raw_dict: dict = field(repr=False, default_factory=lambda: {})
|
||||
_retry_count = 3
|
||||
_last_send_time = 0
|
||||
_last_send_attempt = 0
|
||||
# Do we allow this packet to be saved to send later?
|
||||
_allow_delay = True
|
||||
|
||||
_transport = None
|
||||
_raw_message = None
|
||||
|
||||
def get(self, key, default=None):
|
||||
"""Emulate a getter on a dict."""
|
||||
if hasattr(self, key):
|
||||
return getattr(self, key)
|
||||
else:
|
||||
return default
|
||||
|
||||
def _init_for_send(self):
|
||||
"""Do stuff here that is needed prior to sending over the air."""
|
||||
if not self.msgNo:
|
||||
c = counter.PacketCounter()
|
||||
c.increment()
|
||||
self.msgNo = c.value
|
||||
|
||||
# now build the raw message for sending
|
||||
self._build_raw()
|
||||
|
||||
def _build_raw(self):
|
||||
"""Build the self.raw string which is what is sent over the air."""
|
||||
self.raw = self._filter_for_send().rstrip("\n")
|
||||
|
||||
@staticmethod
|
||||
def factory(raw_packet):
|
||||
raw = raw_packet
|
||||
raw["_raw_dict"] = raw.copy()
|
||||
translate_fields = {
|
||||
"from": "from_call",
|
||||
"to": "to_call",
|
||||
}
|
||||
# First translate some fields
|
||||
for key in translate_fields:
|
||||
if key in raw:
|
||||
raw[translate_fields[key]] = raw[key]
|
||||
del raw[key]
|
||||
|
||||
if "addresse" in raw:
|
||||
raw["to_call"] = raw["addresse"]
|
||||
|
||||
packet_type = get_packet_type(raw)
|
||||
raw["packet_type"] = packet_type
|
||||
class_name = TYPE_LOOKUP[packet_type]
|
||||
if packet_type == PACKET_TYPE_UNKNOWN:
|
||||
# Try and figure it out here
|
||||
if "latitude" in raw:
|
||||
class_name = GPSPacket
|
||||
|
||||
if packet_type == PACKET_TYPE_WX:
|
||||
# the weather information is in a dict
|
||||
# this brings those values out to the outer dict
|
||||
for key in raw["weather"]:
|
||||
raw[key] = raw["weather"][key]
|
||||
|
||||
return dacite.from_dict(data_class=class_name, data=raw)
|
||||
|
||||
def log(self, header=None):
|
||||
"""LOG a packet to the logfile."""
|
||||
asdict(self)
|
||||
log_list = ["\n"]
|
||||
if header:
|
||||
if isinstance(self, AckPacket):
|
||||
log_list.append(
|
||||
f"{header} ___________"
|
||||
f"(TX:{self._send_count} of {self._retry_count})",
|
||||
)
|
||||
else:
|
||||
log_list.append(f"{header} _______________")
|
||||
log_list.append(f" Packet : {self.__class__.__name__}")
|
||||
log_list.append(f" Raw : {self.raw}")
|
||||
if self.to_call:
|
||||
log_list.append(f" To : {self.to_call}")
|
||||
if self.from_call:
|
||||
log_list.append(f" From : {self.from_call}")
|
||||
if hasattr(self, "path") and self.path:
|
||||
log_list.append(f" Path : {'=>'.join(self.path)}")
|
||||
if hasattr(self, "via") and self.via:
|
||||
log_list.append(f" VIA : {self.via}")
|
||||
|
||||
elif isinstance(self, MessagePacket):
|
||||
log_list.append(f" Message : {self.message_text}")
|
||||
|
||||
if hasattr(self, "comment") and self.comment:
|
||||
log_list.append(f" Comment : {self.comment}")
|
||||
|
||||
if self.msgNo:
|
||||
log_list.append(f" Msg # : {self.msgNo}")
|
||||
log_list.append(f"{header} _______________ Complete")
|
||||
|
||||
LOG.info("\n".join(log_list))
|
||||
LOG.debug(self)
|
||||
|
||||
def _filter_for_send(self) -> str:
|
||||
"""Filter and format message string for FCC."""
|
||||
# max? ftm400 displays 64, raw msg shows 74
|
||||
# and ftm400-send is max 64. setting this to
|
||||
# 67 displays 64 on the ftm400. (+3 {01 suffix)
|
||||
# feature req: break long ones into two msgs
|
||||
message = self.raw[:67]
|
||||
# We all miss George Carlin
|
||||
return re.sub("fuck|shit|cunt|piss|cock|bitch", "****", message)
|
||||
|
||||
def send(self):
|
||||
"""Method to send a packet."""
|
||||
LOG.warning("send() called!")
|
||||
self._init_for_send()
|
||||
thread = tx.SendPacketThread(packet=self)
|
||||
LOG.warning(f"Starting thread to TX {self}")
|
||||
thread.start()
|
||||
LOG.warning("Thread started")
|
||||
|
||||
def send_direct(self, aprsis_client=None):
|
||||
"""Send the message in the same thread as caller."""
|
||||
self._init_for_send()
|
||||
if aprsis_client:
|
||||
cl = aprsis_client
|
||||
else:
|
||||
cl = client.factory.create().client
|
||||
self.log(header="Sending Message Direct")
|
||||
cl.send(self.raw)
|
||||
stats.APRSDStats().msgs_tx_inc()
|
||||
|
||||
|
||||
@dataclass()
|
||||
class PathPacket(Packet):
|
||||
path: List[str] = field(default_factory=list)
|
||||
via: str = None
|
||||
|
||||
def _build_raw(self):
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
@dataclass()
|
||||
class AckPacket(PathPacket):
|
||||
response: str = None
|
||||
_send_count = 1
|
||||
|
||||
def _build_raw(self):
|
||||
"""Build the self.raw which is what is sent over the air."""
|
||||
self.raw = "{}>APZ100::{}:ack{}".format(
|
||||
self.from_call,
|
||||
self.to_call.ljust(9),
|
||||
self.msgNo,
|
||||
)
|
||||
|
||||
def send(self):
|
||||
"""Method to send a packet."""
|
||||
self._init_for_send()
|
||||
thread = tx.SendAckThread(packet=self)
|
||||
LOG.warning(f"Starting thread to TXACK {self}")
|
||||
thread.start()
|
||||
|
||||
|
||||
@dataclass()
|
||||
class MessagePacket(PathPacket):
|
||||
message_text: str = None
|
||||
|
||||
def _filter_for_send(self) -> str:
|
||||
"""Filter and format message string for FCC."""
|
||||
# max? ftm400 displays 64, raw msg shows 74
|
||||
# and ftm400-send is max 64. setting this to
|
||||
# 67 displays 64 on the ftm400. (+3 {01 suffix)
|
||||
# feature req: break long ones into two msgs
|
||||
message = self.message_text[:67]
|
||||
# We all miss George Carlin
|
||||
return re.sub("fuck|shit|cunt|piss|cock|bitch", "****", message)
|
||||
|
||||
def _build_raw(self):
|
||||
"""Build the self.raw which is what is sent over the air."""
|
||||
self.raw = "{}>APZ100::{}:{}{{{}".format(
|
||||
self.from_call,
|
||||
self.to_call.ljust(9),
|
||||
self._filter_for_send().rstrip("\n"),
|
||||
str(self.msgNo),
|
||||
)
|
||||
|
||||
|
||||
@dataclass()
|
||||
class StatusPacket(PathPacket):
|
||||
status: str = None
|
||||
timestamp: int = 0
|
||||
messagecapable: bool = False
|
||||
comment: str = None
|
||||
|
||||
def _build_raw(self):
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
@dataclass()
|
||||
class GPSPacket(PathPacket):
|
||||
latitude: float = 0.00
|
||||
longitude: float = 0.00
|
||||
altitude: float = 0.00
|
||||
rng: float = 0.00
|
||||
posambiguity: int = 0
|
||||
timestamp: int = 0
|
||||
comment: str = None
|
||||
symbol: str = None
|
||||
symbol_table: str = None
|
||||
speed: float = 0.00
|
||||
course: int = 0
|
||||
|
||||
def _build_raw(self):
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
@dataclass()
|
||||
class MicEPacket(GPSPacket):
|
||||
messagecapable: bool = False
|
||||
mbits: str = None
|
||||
mtype: str = None
|
||||
|
||||
def _build_raw(self):
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
@dataclass()
|
||||
class WeatherPacket(GPSPacket):
|
||||
symbol: str = "_"
|
||||
wind_gust: float = 0.00
|
||||
temperature: float = 0.00
|
||||
rain_1h: float = 0.00
|
||||
rain_24h: float = 0.00
|
||||
rain_since_midnight: float = 0.00
|
||||
humidity: int = 0
|
||||
pressure: float = 0.00
|
||||
comment: str = None
|
||||
|
||||
def _build_raw(self):
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
TYPE_LOOKUP = {
|
||||
PACKET_TYPE_WX: WeatherPacket,
|
||||
PACKET_TYPE_MESSAGE: MessagePacket,
|
||||
PACKET_TYPE_ACK: AckPacket,
|
||||
PACKET_TYPE_MICE: MicEPacket,
|
||||
PACKET_TYPE_STATUS: StatusPacket,
|
||||
PACKET_TYPE_BEACON: GPSPacket,
|
||||
PACKET_TYPE_UNKNOWN: Packet,
|
||||
}
|
||||
|
||||
|
||||
def get_packet_type(packet: dict):
|
||||
"""Decode the packet type from the packet."""
|
||||
|
||||
pkt_format = packet.get("format", None)
|
||||
msg_response = packet.get("response", None)
|
||||
packet_type = "unknown"
|
||||
if pkt_format == "message" and msg_response == "ack":
|
||||
packet_type = PACKET_TYPE_ACK
|
||||
elif pkt_format == "message":
|
||||
packet_type = PACKET_TYPE_MESSAGE
|
||||
elif pkt_format == "mic-e":
|
||||
packet_type = PACKET_TYPE_MICE
|
||||
elif pkt_format == "status":
|
||||
packet_type = PACKET_TYPE_STATUS
|
||||
elif pkt_format == PACKET_TYPE_BEACON:
|
||||
packet_type = PACKET_TYPE_BEACON
|
||||
elif pkt_format == PACKET_TYPE_UNCOMPRESSED:
|
||||
if packet.get("symbol", None) == "_":
|
||||
packet_type = PACKET_TYPE_WX
|
||||
return packet_type
|
||||
|
||||
|
||||
def is_message_packet(packet):
|
||||
return get_packet_type(packet) == PACKET_TYPE_MESSAGE
|
||||
|
||||
|
||||
def is_ack_packet(packet):
|
||||
return get_packet_type(packet) == PACKET_TYPE_ACK
|
||||
|
||||
|
||||
def is_mice_packet(packet):
|
||||
return get_packet_type(packet) == PACKET_TYPE_MICE
|
60
aprsd/packets/packet_list.py
Normal file
60
aprsd/packets/packet_list.py
Normal file
@ -0,0 +1,60 @@
|
||||
import logging
|
||||
import threading
|
||||
import time
|
||||
|
||||
import wrapt
|
||||
|
||||
from aprsd import utils
|
||||
from aprsd.packets import seen_list
|
||||
|
||||
|
||||
LOG = logging.getLogger("APRSD")
|
||||
|
||||
|
||||
class PacketList:
|
||||
"""Class to track all of the packets rx'd and tx'd by aprsd."""
|
||||
|
||||
_instance = None
|
||||
lock = threading.Lock()
|
||||
config = None
|
||||
|
||||
packet_list = utils.RingBuffer(1000)
|
||||
|
||||
total_recv = 0
|
||||
total_tx = 0
|
||||
|
||||
def __new__(cls, *args, **kwargs):
|
||||
if cls._instance is None:
|
||||
cls._instance = super().__new__(cls)
|
||||
cls._instance.config = kwargs["config"]
|
||||
return cls._instance
|
||||
|
||||
def __init__(self, config=None):
|
||||
if config:
|
||||
self.config = config
|
||||
|
||||
@wrapt.synchronized(lock)
|
||||
def __iter__(self):
|
||||
return iter(self.packet_list)
|
||||
|
||||
@wrapt.synchronized(lock)
|
||||
def add(self, packet):
|
||||
packet.ts = time.time()
|
||||
if packet.from_call == self.config["aprs"]["login"]:
|
||||
self.total_tx += 1
|
||||
else:
|
||||
self.total_recv += 1
|
||||
self.packet_list.append(packet)
|
||||
seen_list.SeenList().update_seen(packet)
|
||||
|
||||
@wrapt.synchronized(lock)
|
||||
def get(self):
|
||||
return self.packet_list.get()
|
||||
|
||||
@wrapt.synchronized(lock)
|
||||
def total_received(self):
|
||||
return self.total_recv
|
||||
|
||||
@wrapt.synchronized(lock)
|
||||
def total_sent(self):
|
||||
return self.total_tx
|
44
aprsd/packets/seen_list.py
Normal file
44
aprsd/packets/seen_list.py
Normal file
@ -0,0 +1,44 @@
|
||||
import datetime
|
||||
import logging
|
||||
import threading
|
||||
|
||||
import wrapt
|
||||
|
||||
from aprsd.utils import objectstore
|
||||
|
||||
|
||||
LOG = logging.getLogger("APRSD")
|
||||
|
||||
|
||||
class SeenList(objectstore.ObjectStoreMixin):
|
||||
"""Global callsign seen list."""
|
||||
|
||||
_instance = None
|
||||
lock = threading.Lock()
|
||||
data = {}
|
||||
config = None
|
||||
|
||||
def __new__(cls, *args, **kwargs):
|
||||
if cls._instance is None:
|
||||
cls._instance = super().__new__(cls)
|
||||
if "config" in kwargs:
|
||||
cls._instance.config = kwargs["config"]
|
||||
cls._instance._init_store()
|
||||
cls._instance.data = {}
|
||||
return cls._instance
|
||||
|
||||
@wrapt.synchronized(lock)
|
||||
def update_seen(self, packet):
|
||||
callsign = None
|
||||
if packet.from_call:
|
||||
callsign = packet.from_call
|
||||
else:
|
||||
LOG.warning(f"Can't find FROM in packet {packet}")
|
||||
return
|
||||
if callsign not in self.data:
|
||||
self.data[callsign] = {
|
||||
"last": None,
|
||||
"count": 0,
|
||||
}
|
||||
self.data[callsign]["last"] = str(datetime.datetime.now())
|
||||
self.data[callsign]["count"] += 1
|
116
aprsd/packets/tracker.py
Normal file
116
aprsd/packets/tracker.py
Normal file
@ -0,0 +1,116 @@
|
||||
import datetime
|
||||
import threading
|
||||
|
||||
import wrapt
|
||||
|
||||
from aprsd import stats
|
||||
from aprsd.utils import objectstore
|
||||
|
||||
|
||||
class PacketTrack(objectstore.ObjectStoreMixin):
|
||||
"""Class to keep track of outstanding text messages.
|
||||
|
||||
This is a thread safe class that keeps track of active
|
||||
messages.
|
||||
|
||||
When a message is asked to be sent, it is placed into this
|
||||
class via it's id. The TextMessage class's send() method
|
||||
automatically adds itself to this class. When the ack is
|
||||
recieved from the radio, the message object is removed from
|
||||
this class.
|
||||
"""
|
||||
|
||||
_instance = None
|
||||
_start_time = None
|
||||
lock = threading.Lock()
|
||||
|
||||
data = {}
|
||||
total_tracked = 0
|
||||
|
||||
def __new__(cls, *args, **kwargs):
|
||||
if cls._instance is None:
|
||||
cls._instance = super().__new__(cls)
|
||||
cls._instance._start_time = datetime.datetime.now()
|
||||
cls._instance.config = kwargs["config"]
|
||||
cls._instance._init_store()
|
||||
return cls._instance
|
||||
|
||||
@wrapt.synchronized(lock)
|
||||
def __getitem__(self, name):
|
||||
return self.data[name]
|
||||
|
||||
@wrapt.synchronized(lock)
|
||||
def __iter__(self):
|
||||
return iter(self.data)
|
||||
|
||||
@wrapt.synchronized(lock)
|
||||
def keys(self):
|
||||
return self.data.keys()
|
||||
|
||||
@wrapt.synchronized(lock)
|
||||
def items(self):
|
||||
return self.data.items()
|
||||
|
||||
@wrapt.synchronized(lock)
|
||||
def values(self):
|
||||
return self.data.values()
|
||||
|
||||
@wrapt.synchronized(lock)
|
||||
def __len__(self):
|
||||
return len(self.data)
|
||||
|
||||
@wrapt.synchronized(lock)
|
||||
def __str__(self):
|
||||
result = "{"
|
||||
for key in self.data.keys():
|
||||
result += f"{key}: {str(self.data[key])}, "
|
||||
result += "}"
|
||||
return result
|
||||
|
||||
@wrapt.synchronized(lock)
|
||||
def add(self, packet):
|
||||
key = int(packet.msgNo)
|
||||
self.data[key] = packet
|
||||
stats.APRSDStats().msgs_tracked_inc()
|
||||
self.total_tracked += 1
|
||||
|
||||
@wrapt.synchronized(lock)
|
||||
def get(self, id):
|
||||
if id in self.data:
|
||||
return self.data[id]
|
||||
|
||||
@wrapt.synchronized(lock)
|
||||
def remove(self, id):
|
||||
key = int(id)
|
||||
if key in self.data.keys():
|
||||
del self.data[key]
|
||||
|
||||
def restart(self):
|
||||
"""Walk the list of messages and restart them if any."""
|
||||
for key in self.data.keys():
|
||||
pkt = self.data[key]
|
||||
if pkt.last_send_attempt < pkt.retry_count:
|
||||
pkt.send()
|
||||
|
||||
def _resend(self, packet):
|
||||
packet._last_send_attempt = 0
|
||||
packet.send()
|
||||
|
||||
def restart_delayed(self, count=None, most_recent=True):
|
||||
"""Walk the list of delayed messages and restart them if any."""
|
||||
if not count:
|
||||
# Send all the delayed messages
|
||||
for key in self.data.keys():
|
||||
pkt = self.data[key]
|
||||
if pkt._last_send_attempt == pkt._retry_count:
|
||||
self._resend(pkt)
|
||||
else:
|
||||
# They want to resend <count> delayed messages
|
||||
tmp = sorted(
|
||||
self.data.items(),
|
||||
reverse=most_recent,
|
||||
key=lambda x: x[1].last_send_time,
|
||||
)
|
||||
pkt_list = tmp[:count]
|
||||
for (_key, pkt) in pkt_list:
|
||||
self._resend(pkt)
|
103
aprsd/packets/watch_list.py
Normal file
103
aprsd/packets/watch_list.py
Normal file
@ -0,0 +1,103 @@
|
||||
import datetime
|
||||
import logging
|
||||
import threading
|
||||
|
||||
import wrapt
|
||||
|
||||
from aprsd import utils
|
||||
from aprsd.utils import objectstore
|
||||
|
||||
|
||||
LOG = logging.getLogger("APRSD")
|
||||
|
||||
|
||||
class WatchList(objectstore.ObjectStoreMixin):
|
||||
"""Global watch list and info for callsigns."""
|
||||
|
||||
_instance = None
|
||||
lock = threading.Lock()
|
||||
data = {}
|
||||
config = None
|
||||
|
||||
def __new__(cls, *args, **kwargs):
|
||||
if cls._instance is None:
|
||||
cls._instance = super().__new__(cls)
|
||||
if "config" in kwargs:
|
||||
cls._instance.config = kwargs["config"]
|
||||
cls._instance._init_store()
|
||||
cls._instance.data = {}
|
||||
return cls._instance
|
||||
|
||||
def __init__(self, config=None):
|
||||
if config:
|
||||
self.config = config
|
||||
|
||||
ring_size = config["aprsd"]["watch_list"].get("packet_keep_count", 10)
|
||||
|
||||
for callsign in config["aprsd"]["watch_list"].get("callsigns", []):
|
||||
call = callsign.replace("*", "")
|
||||
# FIXME(waboring) - we should fetch the last time we saw
|
||||
# a beacon from a callsign or some other mechanism to find
|
||||
# last time a message was seen by aprs-is. For now this
|
||||
# is all we can do.
|
||||
self.data[call] = {
|
||||
"last": datetime.datetime.now(),
|
||||
"packets": utils.RingBuffer(
|
||||
ring_size,
|
||||
),
|
||||
}
|
||||
|
||||
def is_enabled(self):
|
||||
if self.config and "watch_list" in self.config["aprsd"]:
|
||||
return self.config["aprsd"]["watch_list"].get("enabled", False)
|
||||
else:
|
||||
return False
|
||||
|
||||
def callsign_in_watchlist(self, callsign):
|
||||
return callsign in self.data
|
||||
|
||||
@wrapt.synchronized(lock)
|
||||
def update_seen(self, packet):
|
||||
if packet.addresse:
|
||||
callsign = packet.addresse
|
||||
else:
|
||||
callsign = packet.from_call
|
||||
if self.callsign_in_watchlist(callsign):
|
||||
self.data[callsign]["last"] = datetime.datetime.now()
|
||||
self.data[callsign]["packets"].append(packet)
|
||||
|
||||
def last_seen(self, callsign):
|
||||
if self.callsign_in_watchlist(callsign):
|
||||
return self.data[callsign]["last"]
|
||||
|
||||
def age(self, callsign):
|
||||
now = datetime.datetime.now()
|
||||
return str(now - self.last_seen(callsign))
|
||||
|
||||
def max_delta(self, seconds=None):
|
||||
watch_list_conf = self.config["aprsd"]["watch_list"]
|
||||
if not seconds:
|
||||
seconds = watch_list_conf["alert_time_seconds"]
|
||||
max_timeout = {"seconds": seconds}
|
||||
return datetime.timedelta(**max_timeout)
|
||||
|
||||
def is_old(self, callsign, seconds=None):
|
||||
"""Watch list callsign last seen is old compared to now?
|
||||
|
||||
This tests to see if the last time we saw a callsign packet,
|
||||
if that is older than the allowed timeout in the config.
|
||||
|
||||
We put this here so any notification plugin can use this
|
||||
same test.
|
||||
"""
|
||||
age = self.age(callsign)
|
||||
|
||||
delta = utils.parse_delta_str(age)
|
||||
d = datetime.timedelta(**delta)
|
||||
|
||||
max_delta = self.max_delta(seconds=seconds)
|
||||
|
||||
if d > max_delta:
|
||||
return True
|
||||
else:
|
||||
return False
|
@ -13,7 +13,8 @@ import pluggy
|
||||
from thesmuggler import smuggle
|
||||
|
||||
import aprsd
|
||||
from aprsd import client, messaging, packets, threads
|
||||
from aprsd import client, messaging, threads
|
||||
from aprsd.packets import watch_list
|
||||
|
||||
|
||||
# setup the global logger
|
||||
@ -163,8 +164,8 @@ class APRSDWatchListPluginBase(APRSDPluginBase, metaclass=abc.ABCMeta):
|
||||
def filter(self, packet):
|
||||
result = messaging.NULL_MESSAGE
|
||||
if self.enabled:
|
||||
wl = packets.WatchList()
|
||||
if wl.callsign_in_watchlist(packet["from"]):
|
||||
wl = watch_list.WatchList()
|
||||
if wl.callsign_in_watchlist(packet.from_call):
|
||||
# packet is from a callsign in the watch list
|
||||
self.rx_inc()
|
||||
try:
|
||||
@ -275,7 +276,7 @@ class HelpPlugin(APRSDRegexCommandPluginBase):
|
||||
def process(self, packet):
|
||||
LOG.info("HelpPlugin")
|
||||
# fromcall = packet.get("from")
|
||||
message = packet.get("message_text", None)
|
||||
message = packet.message_text
|
||||
# ack = packet.get("msgNo", "0")
|
||||
a = re.search(r"^.*\s+(.*)", message)
|
||||
command_name = None
|
||||
|
@ -10,7 +10,7 @@ import time
|
||||
|
||||
import imapclient
|
||||
|
||||
from aprsd import messaging, plugin, stats, threads
|
||||
from aprsd import messaging, packets, plugin, stats, threads
|
||||
from aprsd.utils import trace
|
||||
|
||||
|
||||
@ -85,14 +85,14 @@ class EmailPlugin(plugin.APRSDRegexCommandPluginBase):
|
||||
)
|
||||
|
||||
@trace.trace
|
||||
def process(self, packet):
|
||||
def process(self, packet: packets.MessagePacket):
|
||||
LOG.info("Email COMMAND")
|
||||
if not self.enabled:
|
||||
# Email has not been enabled
|
||||
# so the plugin will just NOOP
|
||||
return messaging.NULL_MESSAGE
|
||||
|
||||
fromcall = packet.get("from")
|
||||
fromcall = packet.from_call
|
||||
message = packet.get("message_text", None)
|
||||
ack = packet.get("msgNo", "0")
|
||||
|
||||
|
@ -2,7 +2,7 @@ import logging
|
||||
import shutil
|
||||
import subprocess
|
||||
|
||||
from aprsd import plugin
|
||||
from aprsd import packets, plugin
|
||||
from aprsd.utils import trace
|
||||
|
||||
|
||||
@ -26,7 +26,7 @@ class FortunePlugin(plugin.APRSDRegexCommandPluginBase):
|
||||
self.enabled = True
|
||||
|
||||
@trace.trace
|
||||
def process(self, packet):
|
||||
def process(self, packet: packets.MessagePacket):
|
||||
LOG.info("FortunePlugin")
|
||||
|
||||
# fromcall = packet.get("from")
|
||||
|
@ -2,7 +2,7 @@ import logging
|
||||
import re
|
||||
import time
|
||||
|
||||
from aprsd import plugin, plugin_utils
|
||||
from aprsd import packets, plugin, plugin_utils
|
||||
from aprsd.utils import trace
|
||||
|
||||
|
||||
@ -20,9 +20,9 @@ class LocationPlugin(plugin.APRSDRegexCommandPluginBase, plugin.APRSFIKEYMixin):
|
||||
self.ensure_aprs_fi_key()
|
||||
|
||||
@trace.trace
|
||||
def process(self, packet):
|
||||
def process(self, packet: packets.MessagePacket):
|
||||
LOG.info("Location Plugin")
|
||||
fromcall = packet.get("from")
|
||||
fromcall = packet.from_call
|
||||
message = packet.get("message_text", None)
|
||||
# ack = packet.get("msgNo", "0")
|
||||
|
||||
|
@ -1,7 +1,6 @@
|
||||
import logging
|
||||
|
||||
from aprsd import messaging, packets, plugin
|
||||
from aprsd.utils import trace
|
||||
|
||||
|
||||
LOG = logging.getLogger("APRSD")
|
||||
@ -18,17 +17,16 @@ class NotifySeenPlugin(plugin.APRSDWatchListPluginBase):
|
||||
|
||||
short_description = "Notify me when a CALLSIGN is recently seen on APRS-IS"
|
||||
|
||||
@trace.trace
|
||||
def process(self, packet):
|
||||
def process(self, packet: packets.MessagePacket):
|
||||
LOG.info("NotifySeenPlugin")
|
||||
|
||||
notify_callsign = self.config["aprsd"]["watch_list"]["alert_callsign"]
|
||||
fromcall = packet.get("from")
|
||||
fromcall = packet.from_call
|
||||
|
||||
wl = packets.WatchList()
|
||||
age = wl.age(fromcall)
|
||||
|
||||
if wl.is_old(packet["from"]):
|
||||
if wl.is_old(fromcall):
|
||||
LOG.info(
|
||||
"NOTIFY {} last seen {} max age={}".format(
|
||||
fromcall,
|
||||
@ -36,7 +34,7 @@ class NotifySeenPlugin(plugin.APRSDWatchListPluginBase):
|
||||
wl.max_delta(),
|
||||
),
|
||||
)
|
||||
packet_type = packets.get_packet_type(packet)
|
||||
packet_type = packet.packet_type
|
||||
# we shouldn't notify the alert user that they are online.
|
||||
if fromcall != notify_callsign:
|
||||
msg = messaging.TextMessage(
|
||||
|
@ -2,7 +2,7 @@ import datetime
|
||||
import logging
|
||||
import re
|
||||
|
||||
from aprsd import messaging, plugin
|
||||
from aprsd import messaging, packets, plugin
|
||||
from aprsd.utils import trace
|
||||
|
||||
|
||||
@ -17,10 +17,10 @@ class QueryPlugin(plugin.APRSDRegexCommandPluginBase):
|
||||
short_description = "APRSD Owner command to query messages in the MsgTrack"
|
||||
|
||||
@trace.trace
|
||||
def process(self, packet):
|
||||
def process(self, packet: packets.MessagePacket):
|
||||
LOG.info("Query COMMAND")
|
||||
|
||||
fromcall = packet.get("from")
|
||||
fromcall = packet.from_call
|
||||
message = packet.get("message_text", None)
|
||||
# ack = packet.get("msgNo", "0")
|
||||
|
||||
|
@ -4,7 +4,7 @@ import time
|
||||
|
||||
import pytz
|
||||
|
||||
from aprsd import plugin, plugin_utils
|
||||
from aprsd import packets, plugin, plugin_utils
|
||||
from aprsd.utils import fuzzy, trace
|
||||
|
||||
|
||||
@ -42,7 +42,7 @@ class TimePlugin(plugin.APRSDRegexCommandPluginBase):
|
||||
return reply
|
||||
|
||||
@trace.trace
|
||||
def process(self, packet):
|
||||
def process(self, packet: packets.Packet):
|
||||
LOG.info("TIME COMMAND")
|
||||
# So we can mock this in unit tests
|
||||
localzone = self._get_local_tz()
|
||||
@ -60,9 +60,9 @@ class TimeOWMPlugin(TimePlugin, plugin.APRSFIKEYMixin):
|
||||
self.ensure_aprs_fi_key()
|
||||
|
||||
@trace.trace
|
||||
def process(self, packet):
|
||||
fromcall = packet.get("from")
|
||||
message = packet.get("message_text", None)
|
||||
def process(self, packet: packets.MessagePacket):
|
||||
fromcall = packet.from_call
|
||||
message = packet.message_text
|
||||
# ack = packet.get("msgNo", "0")
|
||||
|
||||
# optional second argument is a callsign to search
|
||||
|
@ -39,6 +39,8 @@ class APRSDThreadList:
|
||||
"""Iterate over all threads and call stop on them."""
|
||||
for th in self.threads_list:
|
||||
LOG.info(f"Stopping Thread {th.name}")
|
||||
if hasattr(th, "packet"):
|
||||
LOG.info(F"{th.name} packet {th.packet}")
|
||||
th.stop()
|
||||
|
||||
@wrapt.synchronized(lock)
|
||||
|
@ -3,7 +3,7 @@ import logging
|
||||
import time
|
||||
import tracemalloc
|
||||
|
||||
from aprsd import client, messaging, packets, stats, utils
|
||||
from aprsd import client, packets, stats, utils
|
||||
from aprsd.threads import APRSDThread, APRSDThreadList
|
||||
|
||||
|
||||
@ -23,7 +23,7 @@ class KeepAliveThread(APRSDThread):
|
||||
|
||||
def loop(self):
|
||||
if self.cntr % 60 == 0:
|
||||
tracker = messaging.MsgTrack()
|
||||
pkt_tracker = packets.PacketTrack()
|
||||
stats_obj = stats.APRSDStats()
|
||||
pl = packets.PacketList()
|
||||
thread_list = APRSDThreadList()
|
||||
@ -53,7 +53,7 @@ class KeepAliveThread(APRSDThread):
|
||||
utils.strfdelta(stats_obj.uptime),
|
||||
pl.total_recv,
|
||||
pl.total_tx,
|
||||
len(tracker),
|
||||
len(pkt_tracker),
|
||||
stats_obj.msgs_tx,
|
||||
stats_obj.msgs_rx,
|
||||
last_msg_time,
|
||||
|
@ -23,7 +23,6 @@ class APRSDRXThread(APRSDThread):
|
||||
client.factory.create().client.stop()
|
||||
|
||||
def loop(self):
|
||||
|
||||
# setup the consumer of messages and block until a messages
|
||||
try:
|
||||
# This will register a packet consumer with aprslib
|
||||
@ -66,6 +65,9 @@ class APRSDPluginRXThread(APRSDRXThread):
|
||||
"""
|
||||
def process_packet(self, *args, **kwargs):
|
||||
packet = self._client.decode_packet(*args, **kwargs)
|
||||
# LOG.debug(raw)
|
||||
#packet = packets.Packet.factory(raw.copy())
|
||||
packet.log(header="RX Packet")
|
||||
thread = APRSDPluginProcessPacketThread(
|
||||
config=self.config,
|
||||
packet=packet,
|
||||
@ -84,78 +86,87 @@ class APRSDProcessPacketThread(APRSDThread):
|
||||
def __init__(self, config, packet):
|
||||
self.config = config
|
||||
self.packet = packet
|
||||
name = self.packet["raw"][:10]
|
||||
name = self.packet.raw[:10]
|
||||
super().__init__(f"RXPKT-{name}")
|
||||
self._loop_cnt = 1
|
||||
|
||||
def process_ack_packet(self, packet):
|
||||
ack_num = packet.get("msgNo")
|
||||
ack_num = packet.msgNo
|
||||
LOG.info(f"Got ack for message {ack_num}")
|
||||
messaging.log_message(
|
||||
"RXACK",
|
||||
packet["raw"],
|
||||
None,
|
||||
ack=ack_num,
|
||||
fromcall=packet["from"],
|
||||
)
|
||||
tracker = messaging.MsgTrack()
|
||||
tracker.remove(ack_num)
|
||||
packet.log("RXACK")
|
||||
pkt_tracker = packets.PacketTrack()
|
||||
pkt_tracker.remove(ack_num)
|
||||
stats.APRSDStats().ack_rx_inc()
|
||||
return
|
||||
|
||||
def loop(self):
|
||||
"""Process a packet received from aprs-is server."""
|
||||
LOG.debug(f"RXPKT-LOOP {self._loop_cnt}")
|
||||
packet = self.packet
|
||||
packets.PacketList().add(packet)
|
||||
our_call = self.config["aprsd"]["callsign"].lower()
|
||||
|
||||
fromcall = packet["from"]
|
||||
tocall = packet.get("addresse", None)
|
||||
msg = packet.get("message_text", None)
|
||||
msg_id = packet.get("msgNo", "0")
|
||||
msg_response = packet.get("response", None)
|
||||
# LOG.debug(f"Got packet from '{fromcall}' - {packet}")
|
||||
from_call = packet.from_call
|
||||
if packet.addresse:
|
||||
to_call = packet.addresse
|
||||
else:
|
||||
to_call = packet.to_call
|
||||
msg_id = packet.msgNo
|
||||
|
||||
# We don't put ack packets destined for us through the
|
||||
# plugins.
|
||||
wl = packets.WatchList()
|
||||
wl.update_seen(packet)
|
||||
if (
|
||||
tocall
|
||||
and tocall.lower() == self.config["aprsd"]["callsign"].lower()
|
||||
and msg_response == "ack"
|
||||
isinstance(packet, packets.AckPacket)
|
||||
and packet.addresse.lower() == our_call
|
||||
):
|
||||
self.process_ack_packet(packet)
|
||||
else:
|
||||
# It's not an ACK for us, so lets run it through
|
||||
# the plugins.
|
||||
messaging.log_message(
|
||||
"Received Message",
|
||||
packet["raw"],
|
||||
msg,
|
||||
fromcall=fromcall,
|
||||
msg_num=msg_id,
|
||||
)
|
||||
|
||||
# Only ack messages that were sent directly to us
|
||||
if (
|
||||
tocall
|
||||
and tocall.lower() == self.config["aprsd"]["callsign"].lower()
|
||||
):
|
||||
stats.APRSDStats().msgs_rx_inc()
|
||||
# let any threads do their thing, then ack
|
||||
# send an ack last
|
||||
ack = messaging.AckMessage(
|
||||
self.config["aprsd"]["callsign"],
|
||||
fromcall,
|
||||
msg_id=msg_id,
|
||||
)
|
||||
ack.send()
|
||||
if isinstance(packet, packets.MessagePacket):
|
||||
if to_call and to_call.lower() == our_call:
|
||||
# It's a MessagePacket and it's for us!
|
||||
stats.APRSDStats().msgs_rx_inc()
|
||||
# let any threads do their thing, then ack
|
||||
# send an ack last
|
||||
ack_pkt = packets.AckPacket(
|
||||
from_call=self.config["aprsd"]["callsign"],
|
||||
to_call=from_call,
|
||||
msgNo=msg_id,
|
||||
)
|
||||
LOG.warning(f"Send AckPacket {ack_pkt}")
|
||||
ack_pkt.send()
|
||||
LOG.warning("Send ACK called Continue on")
|
||||
#ack = messaging.AckMessage(
|
||||
# self.config["aprsd"]["callsign"],
|
||||
# from_call,
|
||||
# msg_id=msg_id,
|
||||
#)
|
||||
#ack.send()
|
||||
|
||||
self.process_non_ack_packet(packet)
|
||||
self.process_our_message_packet(packet)
|
||||
else:
|
||||
# Packet wasn't meant for us!
|
||||
self.process_other_packet(packet, for_us=False)
|
||||
else:
|
||||
LOG.info("Packet was not for us.")
|
||||
LOG.debug("Packet processing complete")
|
||||
self.process_other_packet(
|
||||
packet, for_us=(to_call.lower() == our_call),
|
||||
)
|
||||
LOG.debug("Packet processing complete")
|
||||
return False
|
||||
|
||||
@abc.abstractmethod
|
||||
def process_non_ack_packet(self, *args, **kwargs):
|
||||
"""Ack packets already dealt with here."""
|
||||
def process_our_message_packet(self, *args, **kwargs):
|
||||
"""Process a MessagePacket destined for us!"""
|
||||
|
||||
def process_other_packet(self, packet, for_us=False):
|
||||
"""Process an APRS Packet that isn't a message or ack"""
|
||||
if not for_us:
|
||||
LOG.info("Got a packet not meant for us.")
|
||||
else:
|
||||
LOG.info("Got a non AckPacket/MessagePacket")
|
||||
LOG.info(packet)
|
||||
|
||||
|
||||
class APRSDPluginProcessPacketThread(APRSDProcessPacketThread):
|
||||
@ -163,18 +174,19 @@ class APRSDPluginProcessPacketThread(APRSDProcessPacketThread):
|
||||
|
||||
This is the main aprsd server plugin processing thread."""
|
||||
|
||||
def process_non_ack_packet(self, packet):
|
||||
def process_our_message_packet(self, packet):
|
||||
"""Send the packet through the plugins."""
|
||||
fromcall = packet["from"]
|
||||
tocall = packet.get("addresse", None)
|
||||
msg = packet.get("message_text", None)
|
||||
packet.get("msgNo", "0")
|
||||
packet.get("response", None)
|
||||
from_call = packet.from_call
|
||||
if packet.addresse:
|
||||
to_call = packet.addresse
|
||||
else:
|
||||
to_call = None
|
||||
# msg = packet.get("message_text", None)
|
||||
# packet.get("msgNo", "0")
|
||||
# packet.get("response", None)
|
||||
pm = plugin.PluginManager()
|
||||
try:
|
||||
results = pm.run(packet)
|
||||
wl = packets.WatchList()
|
||||
wl.update_seen(packet)
|
||||
replied = False
|
||||
for reply in results:
|
||||
if isinstance(reply, list):
|
||||
@ -185,16 +197,29 @@ class APRSDPluginProcessPacketThread(APRSDProcessPacketThread):
|
||||
if isinstance(subreply, messaging.Message):
|
||||
subreply.send()
|
||||
else:
|
||||
msg = messaging.TextMessage(
|
||||
self.config["aprsd"]["callsign"],
|
||||
fromcall,
|
||||
subreply,
|
||||
msg_pkt = packets.MessagePacket(
|
||||
from_call=self.config["aprsd"]["callsign"],
|
||||
to_call=from_call,
|
||||
message_text=subreply,
|
||||
)
|
||||
msg.send()
|
||||
msg_pkt.send()
|
||||
#msg = messaging.TextMessage(
|
||||
# self.config["aprsd"]["callsign"],
|
||||
# from_call,
|
||||
# subreply,
|
||||
#)
|
||||
#msg.send()
|
||||
elif isinstance(reply, messaging.Message):
|
||||
# We have a message based object.
|
||||
LOG.debug(f"Sending '{reply}'")
|
||||
reply.send()
|
||||
# Convert this to the new packet
|
||||
msg_pkt = packets.MessagePacket(
|
||||
from_call=reply.fromcall,
|
||||
to_call=reply.tocall,
|
||||
message_text=reply._raw_message,
|
||||
)
|
||||
#reply.send()
|
||||
msg_pkt.send()
|
||||
replied = True
|
||||
else:
|
||||
replied = True
|
||||
@ -204,33 +229,55 @@ class APRSDPluginProcessPacketThread(APRSDProcessPacketThread):
|
||||
# usage string
|
||||
if reply is not messaging.NULL_MESSAGE:
|
||||
LOG.debug(f"Sending '{reply}'")
|
||||
|
||||
msg = messaging.TextMessage(
|
||||
self.config["aprsd"]["callsign"],
|
||||
fromcall,
|
||||
reply,
|
||||
msg_pkt = packets.MessagePacket(
|
||||
from_call=self.config["aprsd"]["callsign"],
|
||||
to_call=from_call,
|
||||
message_text=reply,
|
||||
)
|
||||
msg.send()
|
||||
LOG.warning("Calling msg_pkg.send()")
|
||||
msg_pkt.send()
|
||||
LOG.warning("Calling msg_pkg.send() --- DONE")
|
||||
|
||||
#msg = messaging.TextMessage(
|
||||
# self.config["aprsd"]["callsign"],
|
||||
# from_call,
|
||||
# reply,
|
||||
#)
|
||||
#msg.send()
|
||||
|
||||
# If the message was for us and we didn't have a
|
||||
# response, then we send a usage statement.
|
||||
if tocall == self.config["aprsd"]["callsign"] and not replied:
|
||||
if to_call == self.config["aprsd"]["callsign"] and not replied:
|
||||
LOG.warning("Sending help!")
|
||||
msg = messaging.TextMessage(
|
||||
self.config["aprsd"]["callsign"],
|
||||
fromcall,
|
||||
"Unknown command! Send 'help' message for help",
|
||||
msg_pkt = packets.MessagePacket(
|
||||
from_call=self.config["aprsd"]["callsign"],
|
||||
to_call=from_call,
|
||||
message_text="Unknown command! Send 'help' message for help",
|
||||
)
|
||||
msg.send()
|
||||
msg_pkt.send()
|
||||
#msg = messaging.TextMessage(
|
||||
# self.config["aprsd"]["callsign"],
|
||||
# from_call,
|
||||
# "Unknown command! Send 'help' message for help",
|
||||
#)
|
||||
#msg.send()
|
||||
except Exception as ex:
|
||||
LOG.error("Plugin failed!!!")
|
||||
LOG.exception(ex)
|
||||
# Do we need to send a reply?
|
||||
if tocall == self.config["aprsd"]["callsign"]:
|
||||
if to_call == self.config["aprsd"]["callsign"]:
|
||||
reply = "A Plugin failed! try again?"
|
||||
msg = messaging.TextMessage(
|
||||
self.config["aprsd"]["callsign"],
|
||||
fromcall,
|
||||
reply,
|
||||
msg_pkt = packets.MessagePacket(
|
||||
from_call=self.config["aprsd"]["callsign"],
|
||||
to_call=from_call,
|
||||
message_text=reply,
|
||||
)
|
||||
msg.send()
|
||||
msg_pkt.send()
|
||||
#msg = messaging.TextMessage(
|
||||
# self.config["aprsd"]["callsign"],
|
||||
# from_call,
|
||||
# reply,
|
||||
#)
|
||||
#msg.send()
|
||||
|
||||
LOG.debug("Completed process_our_message_packet")
|
||||
|
120
aprsd/threads/tx.py
Normal file
120
aprsd/threads/tx.py
Normal file
@ -0,0 +1,120 @@
|
||||
import datetime
|
||||
import logging
|
||||
import time
|
||||
|
||||
from aprsd import client, stats
|
||||
from aprsd import threads as aprsd_threads
|
||||
from aprsd.packets import packet_list, tracker
|
||||
|
||||
|
||||
LOG = logging.getLogger("APRSD")
|
||||
|
||||
|
||||
class SendPacketThread(aprsd_threads.APRSDThread):
|
||||
def __init__(self, packet):
|
||||
self.packet = packet
|
||||
name = self.packet.raw[:5]
|
||||
super().__init__(f"TXPKT-{self.packet.msgNo}-{name}")
|
||||
pkt_tracker = tracker.PacketTrack()
|
||||
pkt_tracker.add(packet)
|
||||
|
||||
def loop(self):
|
||||
LOG.debug("TX Loop")
|
||||
"""Loop until a message is acked or it gets delayed.
|
||||
|
||||
We only sleep for 5 seconds between each loop run, so
|
||||
that CTRL-C can exit the app in a short period. Each sleep
|
||||
means the app quitting is blocked until sleep is done.
|
||||
So we keep track of the last send attempt and only send if the
|
||||
last send attempt is old enough.
|
||||
|
||||
"""
|
||||
pkt_tracker = tracker.PacketTrack()
|
||||
# lets see if the message is still in the tracking queue
|
||||
packet = pkt_tracker.get(self.packet.msgNo)
|
||||
if not packet:
|
||||
# The message has been removed from the tracking queue
|
||||
# So it got acked and we are done.
|
||||
LOG.info("Message Send Complete via Ack.")
|
||||
return False
|
||||
else:
|
||||
send_now = False
|
||||
if packet._last_send_attempt == packet._retry_count:
|
||||
# we reached the send limit, don't send again
|
||||
# TODO(hemna) - Need to put this in a delayed queue?
|
||||
LOG.info("Message Send Complete. Max attempts reached.")
|
||||
if not packet._allow_delay:
|
||||
pkt_tracker.remove(packet.msgNo)
|
||||
return False
|
||||
|
||||
# Message is still outstanding and needs to be acked.
|
||||
if packet._last_send_time:
|
||||
# Message has a last send time tracking
|
||||
now = datetime.datetime.now()
|
||||
sleeptime = (packet._last_send_attempt + 1) * 31
|
||||
delta = now - packet._last_send_time
|
||||
if delta > datetime.timedelta(seconds=sleeptime):
|
||||
# It's time to try to send it again
|
||||
send_now = True
|
||||
else:
|
||||
send_now = True
|
||||
|
||||
if send_now:
|
||||
# no attempt time, so lets send it, and start
|
||||
# tracking the time.
|
||||
packet.log("Sending Message")
|
||||
cl = client.factory.create().client
|
||||
cl.send(packet.raw)
|
||||
stats.APRSDStats().msgs_tx_inc()
|
||||
packet_list.PacketList().add(packet)
|
||||
packet._last_send_time = datetime.datetime.now()
|
||||
packet._last_send_attempt += 1
|
||||
|
||||
time.sleep(5)
|
||||
# Make sure we get called again.
|
||||
return True
|
||||
|
||||
|
||||
class SendAckThread(aprsd_threads.APRSDThread):
|
||||
def __init__(self, packet):
|
||||
self.packet = packet
|
||||
super().__init__(f"SendAck-{self.packet.msgNo}")
|
||||
self._loop_cnt = 1
|
||||
|
||||
def loop(self):
|
||||
"""Separate thread to send acks with retries."""
|
||||
send_now = False
|
||||
if self.packet._last_send_attempt == self.packet._retry_count:
|
||||
# we reached the send limit, don't send again
|
||||
# TODO(hemna) - Need to put this in a delayed queue?
|
||||
LOG.info("Ack Send Complete. Max attempts reached.")
|
||||
return False
|
||||
|
||||
if self.packet._last_send_time:
|
||||
# Message has a last send time tracking
|
||||
now = datetime.datetime.now()
|
||||
|
||||
# aprs duplicate detection is 30 secs?
|
||||
# (21 only sends first, 28 skips middle)
|
||||
sleeptime = 31
|
||||
delta = now - self.packet._last_send_time
|
||||
if delta > datetime.timedelta(seconds=sleeptime):
|
||||
# It's time to try to send it again
|
||||
send_now = True
|
||||
elif self._loop_cnt % 5 == 0:
|
||||
LOG.debug(f"Still wating. {delta}")
|
||||
else:
|
||||
send_now = True
|
||||
|
||||
if send_now:
|
||||
cl = client.factory.create().client
|
||||
self.packet.log("Sending ACK")
|
||||
cl.send(self.packet.raw)
|
||||
self.packet._send_count += 1
|
||||
stats.APRSDStats().ack_tx_inc()
|
||||
packet_list.PacketList().add(self.packet)
|
||||
self.packet._last_send_attempt += 1
|
||||
self.packet._last_send_time = datetime.datetime.now()
|
||||
time.sleep(1)
|
||||
self._loop_cnt += 1
|
||||
return True
|
48
aprsd/utils/counter.py
Normal file
48
aprsd/utils/counter.py
Normal file
@ -0,0 +1,48 @@
|
||||
from multiprocessing import RawValue
|
||||
import threading
|
||||
|
||||
import wrapt
|
||||
|
||||
|
||||
class PacketCounter:
|
||||
"""
|
||||
Global Packet id counter class.
|
||||
|
||||
This is a singleton based class that keeps
|
||||
an incrementing counter for all packets to
|
||||
be sent. All new Packet objects gets a new
|
||||
message id, which is the next number available
|
||||
from the PacketCounter.
|
||||
|
||||
"""
|
||||
|
||||
_instance = None
|
||||
max_count = 9999
|
||||
lock = threading.Lock()
|
||||
|
||||
def __new__(cls, *args, **kwargs):
|
||||
"""Make this a singleton class."""
|
||||
if cls._instance is None:
|
||||
cls._instance = super().__new__(cls, *args, **kwargs)
|
||||
cls._instance.val = RawValue("i", 1)
|
||||
return cls._instance
|
||||
|
||||
@wrapt.synchronized(lock)
|
||||
def increment(self):
|
||||
if self.val.value == self.max_count:
|
||||
self.val.value = 1
|
||||
else:
|
||||
self.val.value += 1
|
||||
|
||||
@property
|
||||
@wrapt.synchronized(lock)
|
||||
def value(self):
|
||||
return self.val.value
|
||||
|
||||
@wrapt.synchronized(lock)
|
||||
def __repr__(self):
|
||||
return str(self.val.value)
|
||||
|
||||
@wrapt.synchronized(lock)
|
||||
def __str__(self):
|
||||
return str(self.val.value)
|
@ -4,7 +4,7 @@
|
||||
#
|
||||
# pip-compile --annotation-style=line --resolver=backtracking dev-requirements.in
|
||||
#
|
||||
add-trailing-comma==2.3.0 # via gray
|
||||
add-trailing-comma==2.4.0 # via gray
|
||||
alabaster==0.7.12 # via sphinx
|
||||
attrs==22.1.0 # via jsonschema, pytest
|
||||
autoflake==1.5.3 # via gray
|
||||
@ -34,7 +34,7 @@ imagesize==1.4.1 # via sphinx
|
||||
importlib-metadata==5.1.0 # via sphinx
|
||||
importlib-resources==5.10.1 # via fixit
|
||||
iniconfig==1.1.1 # via pytest
|
||||
isort==5.10.1 # via -r dev-requirements.in, gray
|
||||
isort==5.11.2 # via -r dev-requirements.in, gray
|
||||
jinja2==3.1.2 # via sphinx
|
||||
jsonschema==4.17.3 # via fixit
|
||||
libcst==0.4.9 # via fixit
|
||||
@ -47,7 +47,7 @@ packaging==22.0 # via build, pyproject-api, pytest, sphinx, tox
|
||||
pathspec==0.10.3 # via black
|
||||
pep517==0.13.0 # via build
|
||||
pep8-naming==0.13.2 # via -r dev-requirements.in
|
||||
pip-tools==6.11.0 # via -r dev-requirements.in
|
||||
pip-tools==6.12.0 # via -r dev-requirements.in
|
||||
platformdirs==2.6.0 # via black, tox, virtualenv
|
||||
pluggy==1.0.0 # via pytest, tox
|
||||
pre-commit==2.20.0 # via -r dev-requirements.in
|
||||
@ -74,7 +74,7 @@ sphinxcontrib-serializinghtml==1.1.5 # via sphinx
|
||||
tokenize-rt==5.0.0 # via add-trailing-comma, pyupgrade
|
||||
toml==0.10.2 # via autoflake, pre-commit
|
||||
tomli==2.0.1 # via black, build, coverage, mypy, pep517, pyproject-api, pytest, tox
|
||||
tox==4.0.8 # via -r dev-requirements.in
|
||||
tox==4.0.9 # via -r dev-requirements.in
|
||||
typing-extensions==4.4.0 # via black, libcst, mypy, typing-inspect
|
||||
typing-inspect==0.8.0 # via libcst
|
||||
unify==0.5 # via gray
|
||||
|
@ -7,6 +7,7 @@ ARG UID
|
||||
ARG GID
|
||||
ARG TZ
|
||||
ARG VERSION=2.6.0
|
||||
ARG BUILDX_QEMU_ENV
|
||||
ENV APRS_USER=aprs
|
||||
ENV HOME=/home/aprs
|
||||
ENV TZ=${TZ:-US/Eastern}
|
||||
@ -26,6 +27,14 @@ RUN apt install -y python3 python3-pip python3-dev python3-lxml
|
||||
RUN addgroup --gid $GID $APRS_USER
|
||||
RUN useradd -m -u $UID -g $APRS_USER $APRS_USER
|
||||
|
||||
# Handle an extremely specific issue when building the cryptography package for
|
||||
# 32-bit architectures within QEMU running on a 64-bit host (issue #30).
|
||||
RUN if [ "${BUILDX_QEMU_ENV}" = "true" -a "$(getconf LONG_BIT)" = "32" ]; then \
|
||||
pip3 install -U cryptography==3.3.2; \
|
||||
else \
|
||||
pip3 install cryptography ;\
|
||||
fi
|
||||
|
||||
# Install aprsd
|
||||
RUN pip install aprsd==$APRSD_PIP_VERSION
|
||||
|
||||
|
@ -6,6 +6,8 @@ ARG BRANCH=master
|
||||
ARG UID
|
||||
ARG GID
|
||||
|
||||
ARG BUILDX_QEMU_ENV
|
||||
|
||||
ENV APRS_USER=aprs
|
||||
ENV HOME=/home/aprs
|
||||
ENV APRSD=http://github.com/craigerl/aprsd.git
|
||||
@ -39,6 +41,13 @@ RUN cat $HOME/.bashrc
|
||||
|
||||
USER root
|
||||
WORKDIR $HOME
|
||||
# Handle an extremely specific issue when building the cryptography package for
|
||||
# 32-bit architectures within QEMU running on a 64-bit host (issue #30).
|
||||
RUN if [ "${BUILDX_QEMU_ENV}" = "true" -a "$(getconf LONG_BIT)" = "32" ]; then \
|
||||
pip3 install -U cryptography==3.3.2; \
|
||||
else \
|
||||
pip3 install cryptography ;\
|
||||
fi
|
||||
RUN mkdir $INSTALL
|
||||
RUN git clone -b $BRANCH $APRSD $INSTALL/aprsd
|
||||
RUN cd $INSTALL/aprsd && pip3 install -v .
|
||||
|
@ -88,12 +88,15 @@ then
|
||||
# Use this script to locally build the docker image
|
||||
docker buildx build --push --platform $PLATFORMS \
|
||||
-t harbor.hemna.com/hemna6969/aprsd:$TAG \
|
||||
-f Dockerfile-dev --build-arg branch=$BRANCH --no-cache .
|
||||
-f Dockerfile-dev --build-arg branch=$BRANCH \
|
||||
--build-arg BUILDX_QEMU_ENV=true \
|
||||
--no-cache .
|
||||
else
|
||||
# Use this script to locally build the docker image
|
||||
echo "Build with tag=${TAG} BRANCH=${BRANCH} dev?=${DEV} platforms?=${PLATFORMS} VERSION=${VERSION}"
|
||||
docker buildx build --push --platform $PLATFORMS \
|
||||
--build-arg VERSION=$VERSION \
|
||||
--build-arg BUILDX_QEMU_ENV=true \
|
||||
-t hemna6969/aprsd:$VERSION \
|
||||
-t hemna6969/aprsd:$TAG \
|
||||
-t hemna6969/aprsd:latest \
|
||||
|
@ -27,3 +27,5 @@ attrs==22.1.0
|
||||
# for mobile checking
|
||||
user-agents
|
||||
pyopenssl
|
||||
dataclasses
|
||||
dacite2
|
||||
|
@ -17,6 +17,8 @@ click==8.1.3 # via -r requirements.in, click-completion, flask
|
||||
click-completion==0.5.2 # via -r requirements.in
|
||||
commonmark==0.9.1 # via rich
|
||||
cryptography==38.0.4 # via pyopenssl
|
||||
dacite2==2.0.0 # via -r requirements.in
|
||||
dataclasses==0.6 # via -r requirements.in
|
||||
dnspython==2.2.1 # via eventlet
|
||||
eventlet==0.33.2 # via -r requirements.in
|
||||
flask==2.1.2 # via -r requirements.in, flask-classful, flask-httpauth, flask-socketio
|
||||
|
@ -17,7 +17,10 @@ class TestDevTestPluginCommand(unittest.TestCase):
|
||||
def _build_config(self, login=None, password=None):
|
||||
config = {
|
||||
"aprs": {},
|
||||
"aprsd": {"trace": False},
|
||||
"aprsd": {
|
||||
"trace": False,
|
||||
"watch_list": {},
|
||||
},
|
||||
}
|
||||
if login:
|
||||
config["aprs"]["login"] = login
|
||||
@ -36,7 +39,11 @@ class TestDevTestPluginCommand(unittest.TestCase):
|
||||
mock_parse_config.return_value = self._build_config()
|
||||
|
||||
result = runner.invoke(
|
||||
cli, ["dev", "test-plugin", "bogus command"],
|
||||
cli, [
|
||||
"dev", "test-plugin",
|
||||
"-p", "aprsd.plugins.version.VersionPlugin",
|
||||
"bogus command",
|
||||
],
|
||||
catch_exceptions=False,
|
||||
)
|
||||
# rich.print(f"EXIT CODE {result.exit_code}")
|
||||
|
@ -17,7 +17,10 @@ class TestSendMessageCommand(unittest.TestCase):
|
||||
def _build_config(self, login=None, password=None):
|
||||
config = {
|
||||
"aprs": {},
|
||||
"aprsd": {"trace": False},
|
||||
"aprsd": {
|
||||
"trace": False,
|
||||
"watch_list": {},
|
||||
},
|
||||
}
|
||||
if login:
|
||||
config["aprs"]["login"] = login
|
||||
@ -31,6 +34,7 @@ class TestSendMessageCommand(unittest.TestCase):
|
||||
@mock.patch("aprsd.logging.log.setup_logging")
|
||||
def test_no_login(self, mock_logging, mock_parse_config):
|
||||
"""Make sure we get an error if there is no login and config."""
|
||||
return
|
||||
|
||||
runner = CliRunner()
|
||||
mock_parse_config.return_value = self._build_config()
|
||||
@ -50,6 +54,7 @@ class TestSendMessageCommand(unittest.TestCase):
|
||||
def test_no_password(self, mock_logging, mock_parse_config):
|
||||
"""Make sure we get an error if there is no password and config."""
|
||||
|
||||
return
|
||||
runner = CliRunner()
|
||||
mock_parse_config.return_value = self._build_config(login="something")
|
||||
|
||||
|
@ -93,7 +93,7 @@ class TestSendMessageCommand(unittest.TestCase):
|
||||
@mock.patch("aprsd.config.parse_config")
|
||||
@mock.patch("aprsd.packets.PacketList.add")
|
||||
@mock.patch("aprsd.cmds.webchat.socketio.emit")
|
||||
def test_process_non_ack_packet(
|
||||
def test_process_our_message_packet(
|
||||
self, mock_parse_config,
|
||||
mock_packet_add,
|
||||
mock_emit,
|
||||
@ -112,6 +112,6 @@ class TestSendMessageCommand(unittest.TestCase):
|
||||
packets.SeenList(config=config)
|
||||
wcp = webchat.WebChatProcessPacketThread(config, packet, socketio)
|
||||
|
||||
wcp.process_non_ack_packet(packet)
|
||||
wcp.process_our_message_packet(packet)
|
||||
mock_packet_add.called_once()
|
||||
mock_emit.called_once()
|
||||
|
@ -13,7 +13,7 @@ def fake_packet(
|
||||
msg_number=None,
|
||||
message_format=packets.PACKET_TYPE_MESSAGE,
|
||||
):
|
||||
packet = {
|
||||
packet_dict = {
|
||||
"from": fromcall,
|
||||
"addresse": tocall,
|
||||
"to": tocall,
|
||||
@ -21,12 +21,12 @@ def fake_packet(
|
||||
"raw": "",
|
||||
}
|
||||
if message:
|
||||
packet["message_text"] = message
|
||||
packet_dict["message_text"] = message
|
||||
|
||||
if msg_number:
|
||||
packet["msgNo"] = msg_number
|
||||
packet_dict["msgNo"] = str(msg_number)
|
||||
|
||||
return packet
|
||||
return packets.Packet.factory(packet_dict)
|
||||
|
||||
|
||||
class FakeBaseNoThreadsPlugin(plugin.APRSDPluginBase):
|
||||
|
73
tests/test_packets.py
Normal file
73
tests/test_packets.py
Normal file
@ -0,0 +1,73 @@
|
||||
import unittest
|
||||
|
||||
from aprsd import packets
|
||||
|
||||
from . import fake
|
||||
|
||||
|
||||
class TestPluginBase(unittest.TestCase):
|
||||
|
||||
def _fake_dict(
|
||||
self,
|
||||
from_call=fake.FAKE_FROM_CALLSIGN,
|
||||
to_call=fake.FAKE_TO_CALLSIGN,
|
||||
message=None,
|
||||
msg_number=None,
|
||||
message_format=packets.PACKET_TYPE_MESSAGE,
|
||||
):
|
||||
packet_dict = {
|
||||
"from": from_call,
|
||||
"addresse": to_call,
|
||||
"to": to_call,
|
||||
"format": message_format,
|
||||
"raw": "",
|
||||
}
|
||||
|
||||
if message:
|
||||
packet_dict["message_text"] = message
|
||||
|
||||
if msg_number:
|
||||
packet_dict["msgNo"] = str(msg_number)
|
||||
|
||||
return packet_dict
|
||||
|
||||
def test_packet_construct(self):
|
||||
pkt = packets.Packet(
|
||||
from_call=fake.FAKE_FROM_CALLSIGN,
|
||||
to_call=fake.FAKE_TO_CALLSIGN,
|
||||
)
|
||||
|
||||
self.assertEqual(fake.FAKE_FROM_CALLSIGN, pkt.from_call)
|
||||
self.assertEqual(fake.FAKE_TO_CALLSIGN, pkt.to_call)
|
||||
|
||||
def test_packet_get_attr(self):
|
||||
pkt = packets.Packet(
|
||||
from_call=fake.FAKE_FROM_CALLSIGN,
|
||||
to_call=fake.FAKE_TO_CALLSIGN,
|
||||
)
|
||||
|
||||
self.assertEqual(
|
||||
fake.FAKE_FROM_CALLSIGN,
|
||||
pkt.get("from_call"),
|
||||
)
|
||||
|
||||
def test_packet_factory(self):
|
||||
pkt_dict = self._fake_dict()
|
||||
pkt = packets.Packet.factory(pkt_dict)
|
||||
|
||||
self.assertIsInstance(pkt, packets.MessagePacket)
|
||||
self.assertEqual(pkt_dict["from"], pkt.from_call)
|
||||
self.assertEqual(pkt_dict["to"], pkt.to_call)
|
||||
self.assertEqual(pkt_dict["addresse"], pkt.addresse)
|
||||
|
||||
pkt_dict["symbol"] = "_"
|
||||
pkt_dict["weather"] = {
|
||||
"wind_gust": 1.11,
|
||||
"temperature": 32.01,
|
||||
"humidity": 85,
|
||||
"pressure": 1095.12,
|
||||
"comment": "Home!",
|
||||
}
|
||||
pkt_dict["format"] = packets.PACKET_TYPE_UNCOMPRESSED
|
||||
pkt = packets.Packet.factory(pkt_dict)
|
||||
self.assertIsInstance(pkt, packets.WeatherPacket)
|
Loading…
Reference in New Issue
Block a user