Refactor networking and commands

This patch refactors the socket management
to use select, as well as refactor all of the
commands into a COMMAND_ENVELOPE dictionary.

This patch also adds the Dockerfile and
docker-compose.yml files
This commit is contained in:
Hemna 2020-12-11 14:23:08 -05:00
parent 43509ea9e6
commit 00d99bc2c4
6 changed files with 358 additions and 272 deletions

View File

@ -1,6 +1,6 @@
name: python
on: [push]
on: [push, pull_request]
jobs:
tox:

44
Dockerfile Normal file
View File

@ -0,0 +1,44 @@
FROM ubuntu:20.04 as aprsd
ENV VERSION=1.0.0
ENV APRS_USER=aprs
ENV HOME=/home/aprs
ENV APRSD=http://github.com/craigerl/aprsd.git
ENV APRSD_BRANCH="master"
ENV INSTALL=$HOME/install
ENV DEBIAN_FRONTEND=noninteractive
RUN apt-get -y update
RUN apt-get install -y wget gnupg git-core
RUN apt-get install -y apt-utils pkg-config sudo
RUN apt-get install -y python3 python3-pip python3-virtualenv
# Setup Timezone
ENV TZ=US/Eastern
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
RUN apt-get install -y tzdata
RUN dpkg-reconfigure --frontend noninteractive tzdata
RUN addgroup --gid 1000 $APRS_USER
RUN useradd -m -u 1000 -g 1000 -p $APRS_USER $APRS_USER
USER $APRS_USER
RUN echo "export PATH=\$PATH:\$HOME/.local/bin" >> $HOME/.bashrc
VOLUME ["/config"]
WORKDIR $HOME
RUN mkdir $INSTALL
# install librtlsdr from source
RUN git clone -b $APRSD_BRANCH $APRSD $INSTALL/aprsd
USER root
RUN cd $INSTALL/aprsd && pip3 install .
RUN aprsd sample-config > /config/aprsd.yml
# override this to run another configuration
ENV CONF default
USER $APRS_USER
ADD build/bin/run.sh $HOME/
ENTRYPOINT ["/home/aprs/run.sh"]

View File

@ -29,6 +29,7 @@ import logging
import os
import pprint
import re
import select
import signal
import smtplib
import socket
@ -172,7 +173,6 @@ def install(append, case_insensitive, shell, path):
def setup_connection():
global sock
global sock_file
connected = False
while not connected:
try:
@ -181,7 +181,7 @@ def setup_connection():
sock.connect((CONFIG["aprs"]["host"], 14580))
connected = True
LOG.debug("Connected to server: " + CONFIG["aprs"]["host"])
sock_file = sock.makefile(mode="r")
# sock_file = sock.makefile(mode="r")
# sock_file = sock.makefile(mode='r', encoding=None, errors=None, newline=None)
# sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) # disable nagle algorithm
# sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 512) # buffer size
@ -196,6 +196,7 @@ def setup_connection():
LOG.debug("Logging in to APRS-IS with user '%s'" % user)
msg = "user {} pass {} vers aprsd {}\n".format(user, password, aprsd.__version__)
sock.send(msg.encode())
return sock
def signal_handler(signal, frame):
@ -225,7 +226,7 @@ def parse_email(msgid, data, server):
# default in case body somehow isn't set below - happened once
body = "* unreadable msg received"
# this uses the last text or html part in the email, phone companies often put content in an attachment
for ( part) in ( msg.get_payload()):
for part in msg.get_payload():
if (
part.get_content_charset() is None
): # or BREAK when we hit a text or html?
@ -394,7 +395,7 @@ def resend_email(count, fromcall):
# reverse lookup of a shortcut
from_addr = shortcuts_inverted[from_addr]
# asterisk indicates a resend
reply = "-" + from_addr + " * " + body.decode(errors='ignore')
reply = "-" + from_addr + " * " + body.decode(errors="ignore")
send_message(fromcall, reply)
msgexists = True
@ -424,7 +425,7 @@ def resend_email(count, fromcall):
def check_email_thread():
global check_email_delay
#LOG.debug("FIXME initial email delay is 10 seconds")
# LOG.debug("FIXME initial email delay is 10 seconds")
check_email_delay = 60
while True:
# LOG.debug("Top of check_email_thread.")
@ -514,17 +515,16 @@ def send_ack_thread(tocall, ack, retry_count):
# aprs duplicate detection is 30 secs?
# (21 only sends first, 28 skips middle)
time.sleep(31)
return ()
# end_send_ack_thread
def send_ack(tocall, ack):
LOG.debug("Send ACK({}:{}) to radio.".format(tocall, ack))
retry_count = 3
thread = threading.Thread(
target=send_ack_thread, name="send_ack", args=(tocall, ack, retry_count)
)
thread.start()
return ()
# end send_ack()
@ -701,102 +701,19 @@ def sample_config():
click.echo(yaml.dump(utils.DEFAULT_CONFIG_DICT))
# main() ###
@main.command()
@click.option(
"--loglevel",
default="DEBUG",
show_default=True,
type=click.Choice(
["CRITICAL", "ERROR", "WARNING", "INFO", "DEBUG"], case_sensitive=False
),
show_choices=True,
help="The log level to use for aprsd.log",
)
@click.option("--quiet", is_flag=True, default=False, help="Don't log to stdout")
@click.option(
"-c",
"--config",
"config_file",
show_default=True,
default=utils.DEFAULT_CONFIG_FILE,
help="The aprsd config file to use for options.",
)
def server(loglevel, quiet, config_file):
"""Start the aprsd server process."""
global CONFIG
COMMAND_ENVELOPE = {
"email": {"command": "^-.*", "function": "command_email"},
"fortune": {"command": "^[fF]", "function": "command_fortune"},
"location": {"command": "^[lL]", "function": "command_location"},
"weather": {"command": "^[wW]", "function": "command_weather"},
"ping": {"command": "^[pP]", "function": "command_ping"},
"time": {"command": "^[tT]", "function": "command_time"},
}
CONFIG = utils.parse_config(config_file)
signal.signal(signal.SIGINT, signal_handler)
setup_logging(loglevel, quiet)
LOG.info("APRSD Started version: {}".format(aprsd.__version__))
time.sleep(2)
setup_connection()
valid = validate_email()
if not valid:
LOG.error("Failed to validate email config options")
sys.exit(-1)
def command_email(fromcall, message, ack):
LOG.info("Email COMMAND")
user = CONFIG["aprs"]["login"]
# password = CONFIG["aprs"]["password"]
# LOG.debug("LOGIN to APRSD with user '%s'" % user)
# msg = ("user {} pass {} vers aprsd {}\n".format(user, password, aprsd.__version__))
# sock.send(msg.encode())
time.sleep(2)
checkemailthread = threading.Thread(
target=check_email_thread, name="check_email", args=()
) # args must be tuple
checkemailthread.start()
LOG.debug("reset empty line counter")
empty_line_rx = 0
while True:
LOG.debug("Main loop start")
time.sleep(1) # prevent tight loop if something goes awry
line = ""
try:
line = sock_file.readline().strip()
LOG.info("APRS-IS: " + line)
# is aprs message to us? not beacon, status, empty line, etc
searchstring = "::%s" % user
if re.search(searchstring, line):
LOG.debug("main: found message addressed to us begin process_message")
(fromcall, message, ack) = process_message(line)
LOG.debug("reset empty line counter")
empty_line_rx = 0
else:
# LOG.debug("Noise: " + line)
# detect closed socket, getting lots of empty lines
if len(line.strip()) == 0:
LOG.debug(
"Zero line length received. Consecutive empty line count: "
+ str(empty_line_rx)
)
empty_line_rx += 1
if empty_line_rx >= 30:
LOG.debug(
"Excessive empty lines received, socket likely CLOSED_WAIT. Reconnecting."
)
empty_line_rx = 0
raise Exception("closed_socket")
continue # line is something we don't care about
# ACK (ack##)
if re.search("^ack[0-9]+", message):
LOG.debug("ACK")
# put message_number:1 in dict to record the ack
a = re.search("^ack([0-9]+)", message)
ack_dict.update({int(a.group(1)): 1})
continue # break out of this so we don't ack an ack at the end
# EMAIL (-)
# is email command
elif re.search("^-.*", message):
LOG.debug("EMAIL")
searchstring = "^" + CONFIG["ham"]["callsign"] + ".*"
# only I can do email
if re.search(searchstring, fromcall):
@ -813,11 +730,9 @@ def server(loglevel, quiet, config_file):
content = a.group(2)
# send recipient link to aprs.fi map
if content == "mapme":
content = (
"Click for my location: http://aprs.fi/{}".format(
content = "Click for my location: http://aprs.fi/{}".format(
CONFIG["ham"]["callsign"]
)
)
too_soon = 0
now = time.time()
# see if we sent this msg number recently
@ -850,60 +765,28 @@ def server(loglevel, quiet, config_file):
else:
send_message(fromcall, "Bad email address")
# TIME (t)
elif re.search("^[tT]", message):
LOG.debug("TIME")
stm = time.localtime()
h = stm.tm_hour
m = stm.tm_min
cur_time = fuzzy(h, m, 1)
reply = (
cur_time
+ " ("
+ str(h)
+ ":"
+ str(m).rjust(2, "0")
+ "PDT)"
+ " ("
+ message.rstrip()
+ ")"
)
thread = threading.Thread(
target=send_message, name="send_message", args=(fromcall, reply)
)
thread.start()
return (fromcall, message, ack)
# FORTUNE (f)
elif re.search("^[fF]", message):
LOG.debug("FORTUNE")
def command_fortune(fromcall, message, ack):
try:
process = subprocess.Popen(
["/usr/games/fortune", "-s", "-n 60"], stdout=subprocess.PIPE
)
reply = process.communicate()[0]
# send_message(fromcall, reply.rstrip())
reply = reply.decode(errors="ignore")
send_message(fromcall, reply.rstrip())
reply = reply.decode(errors="ignore").rstrip()
# PING (p)
elif re.search("^[pP]", message):
LOG.debug("PING")
stm = time.localtime()
h = stm.tm_hour
m = stm.tm_min
s = stm.tm_sec
reply = (
"Pong! "
+ str(h).zfill(2)
+ ":"
+ str(m).zfill(2)
+ ":"
+ str(s).zfill(2)
)
send_message(fromcall, reply.rstrip())
except Exception as ex:
reply = "Fortune command failed '{}'".format(ex)
LOG.error(reply)
# LOCATION (l) "8 Miles E Auburn CA 1771' 38.91547,-120.99500 0.1h ago"
elif re.search("^[lL]", message):
LOG.debug("LOCATION")
send_message(fromcall, reply)
return (fromcall, message, ack)
def command_location(fromcall, message, ack):
LOG.info("Location COMMAND")
# get last location of a callsign, get descriptive name from weather service
try:
a = re.search(
@ -943,37 +826,29 @@ def server(loglevel, quiet, config_file):
+ "&FcstType=json"
)
response2 = requests.get(url2)
# wx_data = json.loads(response2.read())
wx_data = json.loads(response2.text)
reply = (
searchcall
+ ": "
+ wx_data["location"]["areaDescription"]
+ " "
+ str(altfeet)
+ "' "
+ str(lat)
+ ","
+ str(lon)
+ " "
+ str("%.1f" % round(delta_hours, 1))
+ "h ago"
reply = "{}: {} {}' {},{} {}h ago".format(
searchcall,
wx_data["location"]["areaDescription"],
str(altfeet),
str(alt),
str(lon),
str("%.1f" % round(delta_hours, 1)),
)
# reply = reply.encode('ascii', errors='ignore') # unicode to ascii
send_message(fromcall, reply.rstrip())
except Exception as e:
LOG.debug("Locate failed with: " + "%s" % str(e))
reply = (
"Unable to find station " + searchcall + ". Sending beacons?"
)
reply = "Unable to find station " + searchcall + ". Sending beacons?"
send_message(fromcall, reply.rstrip())
# WEATHER (w) "42F(68F/48F) Haze. Tonight, Haze then Chance Rain."
elif re.search("^[wW]", message):
LOG.debug("WEATHER")
# get my last location from aprsis then get weather from
# weather service
return (fromcall, message, ack)
def command_weather(fromcall, message, ack):
"""Do weather command and send response."""
LOG.info("WEATHER COMMAND")
try:
url = (
"http://api.aprs.fi/api/get?"
@ -1005,22 +880,133 @@ def server(loglevel, quiet, config_file):
except Exception as e:
LOG.debug("Weather failed with: " + "%s" % str(e))
reply = "Unable to find you (send beacon?)"
send_message(fromcall, reply)
# USAGE
return (fromcall, message, ack)
def command_ping(fromcall, message, ack):
LOG.info("PING COMMAND")
stm = time.localtime()
h = stm.tm_hour
m = stm.tm_min
s = stm.tm_sec
reply = "Pong! " + str(h).zfill(2) + ":" + str(m).zfill(2) + ":" + str(s).zfill(2)
send_message(fromcall, reply.rstrip())
return (fromcall, message, ack)
def command_time(fromcall, message, ack):
LOG.info("TIME COMMAND")
stm = time.localtime()
h = stm.tm_hour
m = stm.tm_min
cur_time = fuzzy(h, m, 1)
reply = "{} ({}:{} PDT) ({})".format(
cur_time, str(h), str(m).rjust(2, "0"), message.rstrip()
)
thread = threading.Thread(
target=send_message, name="send_message", args=(fromcall, reply)
)
thread.start()
return (fromcall, message, ack)
# main() ###
@main.command()
@click.option(
"--loglevel",
default="DEBUG",
show_default=True,
type=click.Choice(
["CRITICAL", "ERROR", "WARNING", "INFO", "DEBUG"], case_sensitive=False
),
show_choices=True,
help="The log level to use for aprsd.log",
)
@click.option("--quiet", is_flag=True, default=False, help="Don't log to stdout")
@click.option(
"-c",
"--config",
"config_file",
show_default=True,
default=utils.DEFAULT_CONFIG_FILE,
help="The aprsd config file to use for options.",
)
def server(loglevel, quiet, config_file):
"""Start the aprsd server process."""
global CONFIG
CONFIG = utils.parse_config(config_file)
signal.signal(signal.SIGINT, signal_handler)
setup_logging(loglevel, quiet)
LOG.info("APRSD Started version: {}".format(aprsd.__version__))
time.sleep(2)
client_sock = setup_connection()
valid = validate_email()
if not valid:
LOG.error("Failed to validate email config options")
sys.exit(-1)
user = CONFIG["aprs"]["login"]
LOG.debug("Looking for messages for user '{}'".format(user))
# password = CONFIG["aprs"]["password"]
# LOG.debug("LOGIN to APRSD with user '%s'" % user)
# msg = ("user {} pass {} vers aprsd {}\n".format(user, password, aprsd.__version__))
# sock.send(msg.encode())
time.sleep(2)
checkemailthread = threading.Thread(
target=check_email_thread, name="check_email", args=()
) # args must be tuple
checkemailthread.start()
read_sockets = [client_sock]
fromcall = message = ack = None
while True:
LOG.debug("Main loop start")
reconnect = False
message = None
try:
readable, writable, exceptional = select.select(read_sockets, [], [])
for s in readable:
data = s.recv(10240).decode().strip()
if data:
LOG.info("APRS-IS({}): {}".format(len(data), data))
searchstring = "::%s" % user
if re.search(searchstring, data):
LOG.debug(
"main: found message addressed to us begin process_message"
)
(fromcall, message, ack) = process_message(data)
else:
LOG.debug("USAGE")
reply = "Usage: weather, locate <callsign>, ping, time, fortune"
send_message(fromcall, reply)
LOG.error("Connection Failed. retrying to connect")
read_sockets.remove(s)
s.close()
time.sleep(2)
client_sock = setup_connection()
read_sockets.append(client_sock)
reconnect = True
# let any threads do their thing, then ack
time.sleep(1)
# send an ack last
LOG.debug("Send ACK to radio.")
send_ack(fromcall, ack)
for s in exceptional:
LOG.error("Connection Failed. retrying to connect")
read_sockets.remove(s)
s.close()
time.sleep(2)
client_sock = setup_connection()
read_sockets.append(client_sock)
reconnect = True
if reconnect:
# start the loop over
LOG.warning("Starting Main loop over.")
continue
except Exception as e:
LOG.error("Error in mainline loop:")
LOG.exception(e)
LOG.error("%s" % str(e))
if (
str(e) == "closed_socket"
@ -1029,16 +1015,46 @@ def server(loglevel, quiet, config_file):
or str(e) == "Network is unreachable"
):
LOG.error("Attempting to reconnect.")
sock_file.close()
sock.shutdown(0)
sock.close()
setup_connection()
client_sock = setup_connection()
continue
LOG.error("Unexpected error: " + str(e))
LOG.error("Continuing anyway.")
time.sleep(5)
continue # don't know what failed, so wait and then continue main loop again
if not message:
continue
LOG.debug("Process the command. '{}'".format(message))
# ACK (ack##)
# Custom command due to needing to avoid send_ack
if re.search("^ack[0-9]+", message):
LOG.debug("ACK")
# put message_number:1 in dict to record the ack
a = re.search("^ack([0-9]+)", message)
ack_dict.update({int(a.group(1)): 1})
continue # break out of this so we don't ack an ack at the end
# it's not an ack, so try and process user input
found_command = False
for key in COMMAND_ENVELOPE:
if re.search(COMMAND_ENVELOPE[key]["command"], message):
# now call the registered function
funct = COMMAND_ENVELOPE[key]["function"]
(fromcall, message, ack) = globals()[funct](fromcall, message, ack)
found_command = True
if not found_command:
reply = "Usage: {}".format(", ".join(COMMAND_ENVELOPE.keys()))
send_message(fromcall, reply)
# let any threads do their thing, then ack
time.sleep(1)
# send an ack last
send_ack(fromcall, ack)
LOG.debug("Main loop end")
# end while True

12
build/bin/run.sh Executable file
View File

@ -0,0 +1,12 @@
#!/usr/bin/env bash
export PATH=$PATH:$HOME/.local/bin
# check to see if there is a config file
APRSD_CONFIG="/config/aprsd.yml"
if [ ! -e "$APRSD_CONFIG" ]; then
echo "'$APRSD_CONFIG' File does not exist. Creating."
aprsd sample-config > $APRSD_CONFIG
fi
aprsd server -c $APRSD_CONFIG

4
build/build.sh Executable file
View File

@ -0,0 +1,4 @@
#!/bin/bash
# Use this script to locally build the docker image
docker build --no-cache -t hemna6969/aprsd:latest ..

10
docker-compose.yml Normal file
View File

@ -0,0 +1,10 @@
version: "3"
services:
aprsd:
image: hemna6969/aprsd:latest
container_name: aprsd
volumes:
- /opt/docker/aprsd/config:/config
restart: unless-stopped
environment:
- TZ=America/New_York