Merge pull request #14 from hemna/socket_select

Refactor networking and commands
This commit is contained in:
Craig Lamparter 2020-12-12 09:10:52 -08:00 committed by GitHub
commit 8c9c12b3fc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 358 additions and 272 deletions

View File

@ -1,6 +1,6 @@
name: python
on: [push]
on: [push, pull_request]
jobs:
tox:

44
Dockerfile Normal file
View File

@ -0,0 +1,44 @@
FROM ubuntu:20.04 as aprsd
ENV VERSION=1.0.0
ENV APRS_USER=aprs
ENV HOME=/home/aprs
ENV APRSD=http://github.com/craigerl/aprsd.git
ENV APRSD_BRANCH="master"
ENV INSTALL=$HOME/install
ENV DEBIAN_FRONTEND=noninteractive
RUN apt-get -y update
RUN apt-get install -y wget gnupg git-core
RUN apt-get install -y apt-utils pkg-config sudo
RUN apt-get install -y python3 python3-pip python3-virtualenv
# Setup Timezone
ENV TZ=US/Eastern
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
RUN apt-get install -y tzdata
RUN dpkg-reconfigure --frontend noninteractive tzdata
RUN addgroup --gid 1000 $APRS_USER
RUN useradd -m -u 1000 -g 1000 -p $APRS_USER $APRS_USER
USER $APRS_USER
RUN echo "export PATH=\$PATH:\$HOME/.local/bin" >> $HOME/.bashrc
VOLUME ["/config"]
WORKDIR $HOME
RUN mkdir $INSTALL
# install librtlsdr from source
RUN git clone -b $APRSD_BRANCH $APRSD $INSTALL/aprsd
USER root
RUN cd $INSTALL/aprsd && pip3 install .
RUN aprsd sample-config > /config/aprsd.yml
# override this to run another configuration
ENV CONF default
USER $APRS_USER
ADD build/bin/run.sh $HOME/
ENTRYPOINT ["/home/aprs/run.sh"]

View File

@ -29,6 +29,7 @@ import logging
import os
import pprint
import re
import select
import signal
import smtplib
import socket
@ -172,7 +173,6 @@ def install(append, case_insensitive, shell, path):
def setup_connection():
global sock
global sock_file
connected = False
while not connected:
try:
@ -181,7 +181,7 @@ def setup_connection():
sock.connect((CONFIG["aprs"]["host"], 14580))
connected = True
LOG.debug("Connected to server: " + CONFIG["aprs"]["host"])
sock_file = sock.makefile(mode="r")
# sock_file = sock.makefile(mode="r")
# sock_file = sock.makefile(mode='r', encoding=None, errors=None, newline=None)
# sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) # disable nagle algorithm
# sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 512) # buffer size
@ -196,6 +196,7 @@ def setup_connection():
LOG.debug("Logging in to APRS-IS with user '%s'" % user)
msg = "user {} pass {} vers aprsd {}\n".format(user, password, aprsd.__version__)
sock.send(msg.encode())
return sock
def signal_handler(signal, frame):
@ -225,7 +226,7 @@ def parse_email(msgid, data, server):
# default in case body somehow isn't set below - happened once
body = "* unreadable msg received"
# this uses the last text or html part in the email, phone companies often put content in an attachment
for ( part) in ( msg.get_payload()):
for part in msg.get_payload():
if (
part.get_content_charset() is None
): # or BREAK when we hit a text or html?
@ -394,7 +395,7 @@ def resend_email(count, fromcall):
# reverse lookup of a shortcut
from_addr = shortcuts_inverted[from_addr]
# asterisk indicates a resend
reply = "-" + from_addr + " * " + body.decode(errors='ignore')
reply = "-" + from_addr + " * " + body.decode(errors="ignore")
send_message(fromcall, reply)
msgexists = True
@ -424,7 +425,7 @@ def resend_email(count, fromcall):
def check_email_thread():
global check_email_delay
#LOG.debug("FIXME initial email delay is 10 seconds")
# LOG.debug("FIXME initial email delay is 10 seconds")
check_email_delay = 60
while True:
# LOG.debug("Top of check_email_thread.")
@ -514,17 +515,16 @@ def send_ack_thread(tocall, ack, retry_count):
# aprs duplicate detection is 30 secs?
# (21 only sends first, 28 skips middle)
time.sleep(31)
return ()
# end_send_ack_thread
def send_ack(tocall, ack):
LOG.debug("Send ACK({}:{}) to radio.".format(tocall, ack))
retry_count = 3
thread = threading.Thread(
target=send_ack_thread, name="send_ack", args=(tocall, ack, retry_count)
)
thread.start()
return ()
# end send_ack()
@ -701,6 +701,216 @@ def sample_config():
click.echo(yaml.dump(utils.DEFAULT_CONFIG_DICT))
COMMAND_ENVELOPE = {
"email": {"command": "^-.*", "function": "command_email"},
"fortune": {"command": "^[fF]", "function": "command_fortune"},
"location": {"command": "^[lL]", "function": "command_location"},
"weather": {"command": "^[wW]", "function": "command_weather"},
"ping": {"command": "^[pP]", "function": "command_ping"},
"time": {"command": "^[tT]", "function": "command_time"},
}
def command_email(fromcall, message, ack):
LOG.info("Email COMMAND")
searchstring = "^" + CONFIG["ham"]["callsign"] + ".*"
# only I can do email
if re.search(searchstring, fromcall):
# digits only, first one is number of emails to resend
r = re.search("^-([0-9])[0-9]*$", message)
if r is not None:
resend_email(r.group(1), fromcall)
# -user@address.com body of email
elif re.search(r"^-([A-Za-z0-9_\-\.@]+) (.*)", message):
# (same search again)
a = re.search(r"^-([A-Za-z0-9_\-\.@]+) (.*)", message)
if a is not None:
to_addr = a.group(1)
content = a.group(2)
# send recipient link to aprs.fi map
if content == "mapme":
content = "Click for my location: http://aprs.fi/{}".format(
CONFIG["ham"]["callsign"]
)
too_soon = 0
now = time.time()
# see if we sent this msg number recently
if ack in email_sent_dict:
timedelta = now - email_sent_dict[ack]
if timedelta < 300: # five minutes
too_soon = 1
if not too_soon or ack == 0:
send_result = send_email(to_addr, content)
if send_result != 0:
send_message(fromcall, "-" + to_addr + " failed")
else:
# send_message(fromcall, "-" + to_addr + " sent")
if (
len(email_sent_dict) > 98
): # clear email sent dictionary if somehow goes over 100
LOG.debug(
"DEBUG: email_sent_dict is big ("
+ str(len(email_sent_dict))
+ ") clearing out."
)
email_sent_dict.clear()
email_sent_dict[ack] = now
else:
LOG.info(
"Email for message number "
+ ack
+ " recently sent, not sending again."
)
else:
send_message(fromcall, "Bad email address")
return (fromcall, message, ack)
def command_fortune(fromcall, message, ack):
try:
process = subprocess.Popen(
["/usr/games/fortune", "-s", "-n 60"], stdout=subprocess.PIPE
)
reply = process.communicate()[0]
# send_message(fromcall, reply.rstrip())
reply = reply.decode(errors="ignore").rstrip()
except Exception as ex:
reply = "Fortune command failed '{}'".format(ex)
LOG.error(reply)
send_message(fromcall, reply)
return (fromcall, message, ack)
def command_location(fromcall, message, ack):
LOG.info("Location COMMAND")
# get last location of a callsign, get descriptive name from weather service
try:
a = re.search(
r"^.*\s+(.*)", message
) # optional second argument is a callsign to search
if a is not None:
searchcall = a.group(1)
searchcall = searchcall.upper()
else:
searchcall = fromcall # if no second argument, search for calling station
url = (
"http://api.aprs.fi/api/get?name="
+ searchcall
+ "&what=loc&apikey=104070.f9lE8qg34L8MZF&format=json"
)
response = requests.get(url)
# aprs_data = json.loads(response.read())
aprs_data = json.loads(response.text)
lat = aprs_data["entries"][0]["lat"]
lon = aprs_data["entries"][0]["lng"]
try: # altitude not always provided
alt = aprs_data["entries"][0]["altitude"]
except Exception:
alt = 0
altfeet = int(alt * 3.28084)
aprs_lasttime_seconds = aprs_data["entries"][0]["lasttime"]
aprs_lasttime_seconds = aprs_lasttime_seconds.encode(
"ascii", errors="ignore"
) # unicode to ascii
delta_seconds = time.time() - int(aprs_lasttime_seconds)
delta_hours = delta_seconds / 60 / 60
url2 = (
"https://forecast.weather.gov/MapClick.php?lat="
+ str(lat)
+ "&lon="
+ str(lon)
+ "&FcstType=json"
)
response2 = requests.get(url2)
wx_data = json.loads(response2.text)
reply = "{}: {} {}' {},{} {}h ago".format(
searchcall,
wx_data["location"]["areaDescription"],
str(altfeet),
str(alt),
str(lon),
str("%.1f" % round(delta_hours, 1)),
)
# reply = reply.encode('ascii', errors='ignore') # unicode to ascii
send_message(fromcall, reply.rstrip())
except Exception as e:
LOG.debug("Locate failed with: " + "%s" % str(e))
reply = "Unable to find station " + searchcall + ". Sending beacons?"
send_message(fromcall, reply.rstrip())
return (fromcall, message, ack)
def command_weather(fromcall, message, ack):
"""Do weather command and send response."""
LOG.info("WEATHER COMMAND")
try:
url = (
"http://api.aprs.fi/api/get?"
"&what=loc&apikey=104070.f9lE8qg34L8MZF&format=json"
"&name=%s" % fromcall
)
response = requests.get(url)
# aprs_data = json.loads(response.read())
aprs_data = json.loads(response.text)
lat = aprs_data["entries"][0]["lat"]
lon = aprs_data["entries"][0]["lng"]
url2 = (
"https://forecast.weather.gov/MapClick.php?lat=%s"
"&lon=%s&FcstType=json" % (lat, lon)
)
response2 = requests.get(url2)
# wx_data = json.loads(response2.read())
wx_data = json.loads(response2.text)
reply = "%sF(%sF/%sF) %s. %s, %s." % (
wx_data["currentobservation"]["Temp"],
wx_data["data"]["temperature"][0],
wx_data["data"]["temperature"][1],
wx_data["data"]["weather"][0],
wx_data["time"]["startPeriodName"][1],
wx_data["data"]["weather"][1],
)
LOG.debug("reply: " + reply.rstrip())
send_message(fromcall, reply.rstrip())
except Exception as e:
LOG.debug("Weather failed with: " + "%s" % str(e))
reply = "Unable to find you (send beacon?)"
return (fromcall, message, ack)
def command_ping(fromcall, message, ack):
LOG.info("PING COMMAND")
stm = time.localtime()
h = stm.tm_hour
m = stm.tm_min
s = stm.tm_sec
reply = "Pong! " + str(h).zfill(2) + ":" + str(m).zfill(2) + ":" + str(s).zfill(2)
send_message(fromcall, reply.rstrip())
return (fromcall, message, ack)
def command_time(fromcall, message, ack):
LOG.info("TIME COMMAND")
stm = time.localtime()
h = stm.tm_hour
m = stm.tm_min
cur_time = fuzzy(h, m, 1)
reply = "{} ({}:{} PDT) ({})".format(
cur_time, str(h), str(m).rjust(2, "0"), message.rstrip()
)
thread = threading.Thread(
target=send_message, name="send_message", args=(fromcall, reply)
)
thread.start()
return (fromcall, message, ack)
# main() ###
@main.command()
@click.option(
@ -732,13 +942,14 @@ def server(loglevel, quiet, config_file):
LOG.info("APRSD Started version: {}".format(aprsd.__version__))
time.sleep(2)
setup_connection()
client_sock = setup_connection()
valid = validate_email()
if not valid:
LOG.error("Failed to validate email config options")
sys.exit(-1)
user = CONFIG["aprs"]["login"]
LOG.debug("Looking for messages for user '{}'".format(user))
# password = CONFIG["aprs"]["password"]
# LOG.debug("LOGIN to APRSD with user '%s'" % user)
# msg = ("user {} pass {} vers aprsd {}\n".format(user, password, aprsd.__version__))
@ -751,276 +962,51 @@ def server(loglevel, quiet, config_file):
) # args must be tuple
checkemailthread.start()
LOG.debug("reset empty line counter")
empty_line_rx = 0
read_sockets = [client_sock]
fromcall = message = ack = None
while True:
LOG.debug("Main loop start")
time.sleep(1) # prevent tight loop if something goes awry
line = ""
reconnect = False
message = None
try:
line = sock_file.readline().strip()
LOG.info("APRS-IS: " + line)
# is aprs message to us? not beacon, status, empty line, etc
searchstring = "::%s" % user
if re.search(searchstring, line):
LOG.debug("main: found message addressed to us begin process_message")
(fromcall, message, ack) = process_message(line)
LOG.debug("reset empty line counter")
empty_line_rx = 0
else:
# LOG.debug("Noise: " + line)
# detect closed socket, getting lots of empty lines
if len(line.strip()) == 0:
LOG.debug(
"Zero line length received. Consecutive empty line count: "
+ str(empty_line_rx)
)
empty_line_rx += 1
if empty_line_rx >= 30:
LOG.debug(
"Excessive empty lines received, socket likely CLOSED_WAIT. Reconnecting."
)
empty_line_rx = 0
raise Exception("closed_socket")
continue # line is something we don't care about
readable, writable, exceptional = select.select(read_sockets, [], [])
# ACK (ack##)
if re.search("^ack[0-9]+", message):
LOG.debug("ACK")
# put message_number:1 in dict to record the ack
a = re.search("^ack([0-9]+)", message)
ack_dict.update({int(a.group(1)): 1})
continue # break out of this so we don't ack an ack at the end
for s in readable:
data = s.recv(10240).decode().strip()
if data:
LOG.info("APRS-IS({}): {}".format(len(data), data))
searchstring = "::%s" % user
if re.search(searchstring, data):
LOG.debug(
"main: found message addressed to us begin process_message"
)
(fromcall, message, ack) = process_message(data)
else:
LOG.error("Connection Failed. retrying to connect")
read_sockets.remove(s)
s.close()
time.sleep(2)
client_sock = setup_connection()
read_sockets.append(client_sock)
reconnect = True
# EMAIL (-)
# is email command
elif re.search("^-.*", message):
LOG.debug("EMAIL")
searchstring = "^" + CONFIG["ham"]["callsign"] + ".*"
# only I can do email
if re.search(searchstring, fromcall):
# digits only, first one is number of emails to resend
r = re.search("^-([0-9])[0-9]*$", message)
if r is not None:
resend_email(r.group(1), fromcall)
# -user@address.com body of email
elif re.search(r"^-([A-Za-z0-9_\-\.@]+) (.*)", message):
# (same search again)
a = re.search(r"^-([A-Za-z0-9_\-\.@]+) (.*)", message)
if a is not None:
to_addr = a.group(1)
content = a.group(2)
# send recipient link to aprs.fi map
if content == "mapme":
content = (
"Click for my location: http://aprs.fi/{}".format(
CONFIG["ham"]["callsign"]
)
)
too_soon = 0
now = time.time()
# see if we sent this msg number recently
if ack in email_sent_dict:
timedelta = now - email_sent_dict[ack]
if timedelta < 300: # five minutes
too_soon = 1
if not too_soon or ack == 0:
send_result = send_email(to_addr, content)
if send_result != 0:
send_message(fromcall, "-" + to_addr + " failed")
else:
# send_message(fromcall, "-" + to_addr + " sent")
if (
len(email_sent_dict) > 98
): # clear email sent dictionary if somehow goes over 100
LOG.debug(
"DEBUG: email_sent_dict is big ("
+ str(len(email_sent_dict))
+ ") clearing out."
)
email_sent_dict.clear()
email_sent_dict[ack] = now
else:
LOG.info(
"Email for message number "
+ ack
+ " recently sent, not sending again."
)
else:
send_message(fromcall, "Bad email address")
for s in exceptional:
LOG.error("Connection Failed. retrying to connect")
read_sockets.remove(s)
s.close()
time.sleep(2)
client_sock = setup_connection()
read_sockets.append(client_sock)
reconnect = True
# TIME (t)
elif re.search("^[tT]", message):
LOG.debug("TIME")
stm = time.localtime()
h = stm.tm_hour
m = stm.tm_min
cur_time = fuzzy(h, m, 1)
reply = (
cur_time
+ " ("
+ str(h)
+ ":"
+ str(m).rjust(2, "0")
+ "PDT)"
+ " ("
+ message.rstrip()
+ ")"
)
thread = threading.Thread(
target=send_message, name="send_message", args=(fromcall, reply)
)
thread.start()
# FORTUNE (f)
elif re.search("^[fF]", message):
LOG.debug("FORTUNE")
process = subprocess.Popen(
["/usr/games/fortune", "-s", "-n 60"], stdout=subprocess.PIPE
)
reply = process.communicate()[0]
# send_message(fromcall, reply.rstrip())
reply = reply.decode(errors="ignore")
send_message(fromcall, reply.rstrip())
# PING (p)
elif re.search("^[pP]", message):
LOG.debug("PING")
stm = time.localtime()
h = stm.tm_hour
m = stm.tm_min
s = stm.tm_sec
reply = (
"Pong! "
+ str(h).zfill(2)
+ ":"
+ str(m).zfill(2)
+ ":"
+ str(s).zfill(2)
)
send_message(fromcall, reply.rstrip())
# LOCATION (l) "8 Miles E Auburn CA 1771' 38.91547,-120.99500 0.1h ago"
elif re.search("^[lL]", message):
LOG.debug("LOCATION")
# get last location of a callsign, get descriptive name from weather service
try:
a = re.search(
r"^.*\s+(.*)", message
) # optional second argument is a callsign to search
if a is not None:
searchcall = a.group(1)
searchcall = searchcall.upper()
else:
searchcall = fromcall # if no second argument, search for calling station
url = (
"http://api.aprs.fi/api/get?name="
+ searchcall
+ "&what=loc&apikey=104070.f9lE8qg34L8MZF&format=json"
)
response = requests.get(url)
# aprs_data = json.loads(response.read())
aprs_data = json.loads(response.text)
lat = aprs_data["entries"][0]["lat"]
lon = aprs_data["entries"][0]["lng"]
try: # altitude not always provided
alt = aprs_data["entries"][0]["altitude"]
except Exception:
alt = 0
altfeet = int(alt * 3.28084)
aprs_lasttime_seconds = aprs_data["entries"][0]["lasttime"]
aprs_lasttime_seconds = aprs_lasttime_seconds.encode(
"ascii", errors="ignore"
) # unicode to ascii
delta_seconds = time.time() - int(aprs_lasttime_seconds)
delta_hours = delta_seconds / 60 / 60
url2 = (
"https://forecast.weather.gov/MapClick.php?lat="
+ str(lat)
+ "&lon="
+ str(lon)
+ "&FcstType=json"
)
response2 = requests.get(url2)
# wx_data = json.loads(response2.read())
wx_data = json.loads(response2.text)
reply = (
searchcall
+ ": "
+ wx_data["location"]["areaDescription"]
+ " "
+ str(altfeet)
+ "' "
+ str(lat)
+ ","
+ str(lon)
+ " "
+ str("%.1f" % round(delta_hours, 1))
+ "h ago"
)
# reply = reply.encode('ascii', errors='ignore') # unicode to ascii
send_message(fromcall, reply.rstrip())
except Exception as e:
LOG.debug("Locate failed with: " + "%s" % str(e))
reply = (
"Unable to find station " + searchcall + ". Sending beacons?"
)
send_message(fromcall, reply.rstrip())
# WEATHER (w) "42F(68F/48F) Haze. Tonight, Haze then Chance Rain."
elif re.search("^[wW]", message):
LOG.debug("WEATHER")
# get my last location from aprsis then get weather from
# weather service
try:
url = (
"http://api.aprs.fi/api/get?"
"&what=loc&apikey=104070.f9lE8qg34L8MZF&format=json"
"&name=%s" % fromcall
)
response = requests.get(url)
# aprs_data = json.loads(response.read())
aprs_data = json.loads(response.text)
lat = aprs_data["entries"][0]["lat"]
lon = aprs_data["entries"][0]["lng"]
url2 = (
"https://forecast.weather.gov/MapClick.php?lat=%s"
"&lon=%s&FcstType=json" % (lat, lon)
)
response2 = requests.get(url2)
# wx_data = json.loads(response2.read())
wx_data = json.loads(response2.text)
reply = "%sF(%sF/%sF) %s. %s, %s." % (
wx_data["currentobservation"]["Temp"],
wx_data["data"]["temperature"][0],
wx_data["data"]["temperature"][1],
wx_data["data"]["weather"][0],
wx_data["time"]["startPeriodName"][1],
wx_data["data"]["weather"][1],
)
LOG.debug("reply: " + reply.rstrip())
send_message(fromcall, reply.rstrip())
except Exception as e:
LOG.debug("Weather failed with: " + "%s" % str(e))
reply = "Unable to find you (send beacon?)"
send_message(fromcall, reply)
# USAGE
else:
LOG.debug("USAGE")
reply = "Usage: weather, locate <callsign>, ping, time, fortune"
send_message(fromcall, reply)
# let any threads do their thing, then ack
time.sleep(1)
# send an ack last
LOG.debug("Send ACK to radio.")
send_ack(fromcall, ack)
if reconnect:
# start the loop over
LOG.warning("Starting Main loop over.")
continue
except Exception as e:
LOG.error("Error in mainline loop:")
LOG.exception(e)
LOG.error("%s" % str(e))
if (
str(e) == "closed_socket"
@ -1029,16 +1015,46 @@ def server(loglevel, quiet, config_file):
or str(e) == "Network is unreachable"
):
LOG.error("Attempting to reconnect.")
sock_file.close()
sock.shutdown(0)
sock.close()
setup_connection()
client_sock = setup_connection()
continue
LOG.error("Unexpected error: " + str(e))
LOG.error("Continuing anyway.")
time.sleep(5)
continue # don't know what failed, so wait and then continue main loop again
if not message:
continue
LOG.debug("Process the command. '{}'".format(message))
# ACK (ack##)
# Custom command due to needing to avoid send_ack
if re.search("^ack[0-9]+", message):
LOG.debug("ACK")
# put message_number:1 in dict to record the ack
a = re.search("^ack([0-9]+)", message)
ack_dict.update({int(a.group(1)): 1})
continue # break out of this so we don't ack an ack at the end
# it's not an ack, so try and process user input
found_command = False
for key in COMMAND_ENVELOPE:
if re.search(COMMAND_ENVELOPE[key]["command"], message):
# now call the registered function
funct = COMMAND_ENVELOPE[key]["function"]
(fromcall, message, ack) = globals()[funct](fromcall, message, ack)
found_command = True
if not found_command:
reply = "Usage: {}".format(", ".join(COMMAND_ENVELOPE.keys()))
send_message(fromcall, reply)
# let any threads do their thing, then ack
time.sleep(1)
# send an ack last
send_ack(fromcall, ack)
LOG.debug("Main loop end")
# end while True

12
build/bin/run.sh Executable file
View File

@ -0,0 +1,12 @@
#!/usr/bin/env bash
export PATH=$PATH:$HOME/.local/bin
# check to see if there is a config file
APRSD_CONFIG="/config/aprsd.yml"
if [ ! -e "$APRSD_CONFIG" ]; then
echo "'$APRSD_CONFIG' File does not exist. Creating."
aprsd sample-config > $APRSD_CONFIG
fi
aprsd server -c $APRSD_CONFIG

4
build/build.sh Executable file
View File

@ -0,0 +1,4 @@
#!/bin/bash
# Use this script to locally build the docker image
docker build --no-cache -t hemna6969/aprsd:latest ..

10
docker-compose.yml Normal file
View File

@ -0,0 +1,10 @@
version: "3"
services:
aprsd:
image: hemna6969/aprsd:latest
container_name: aprsd
volumes:
- /opt/docker/aprsd/config:/config
restart: unless-stopped
environment:
- TZ=America/New_York