1
0
mirror of https://github.com/craigerl/aprsd.git synced 2024-10-31 15:07:13 -04:00

Update tox environment to fix formatting python errors

This patch includes lots of changes to tox environment for
automatically detecting pep8 failures, which can cause python2 vs
python3 failures after install.

The following tox commands have been added
tox -efmt-check  - This checks the python syntax and formatting
tox -efmt        - Automatically fixes python syntax formatting that
                   fmt-check complains about.
tox -etype-check - check on types
tox -elint       - flake8 run

This patch also changes where the default config file is located.
The new location is ~/.config/aprsd/aprsd.yml

You can now also specify a custom config file on the command line
with the -c or --config option as well.
This commit is contained in:
Hemna 2020-12-09 14:13:35 -05:00
parent 2bebd83449
commit 53b8f21535
14 changed files with 750 additions and 391 deletions

22
.github/workflows/python.yml vendored Normal file
View File

@ -0,0 +1,22 @@
name: python
on: [push]
jobs:
tox:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: [2.7, 3.6, 3.7, 3.8, 3.9]
steps:
- uses: actions/checkout@v2
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v2
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install tox tox-gh-actions
- name: Test with tox
run: tox

View File

@ -14,6 +14,4 @@
import pbr.version
__version__ = pbr.version.VersionInfo(
'aprsd').version_string()
__version__ = pbr.version.VersionInfo("aprsd").version_string()

View File

@ -1,33 +1,27 @@
import argparse
import logging
import socketserver
import sys
import time
import socketserver
from logging.handlers import RotatingFileHandler
from aprsd import utils
# command line args
parser = argparse.ArgumentParser()
parser.add_argument("--loglevel",
default='DEBUG',
choices=['CRITICAL', 'ERROR', 'WARNING', 'INFO', 'DEBUG'],
help="The log level to use for aprsd.log")
parser.add_argument("--quiet",
action='store_true',
help="Don't log to stdout")
parser.add_argument(
"--loglevel",
default="DEBUG",
choices=["CRITICAL", "ERROR", "WARNING", "INFO", "DEBUG"],
help="The log level to use for aprsd.log",
)
parser.add_argument("--quiet", action="store_true", help="Don't log to stdout")
parser.add_argument("--port",
default=9099,
type=int,
help="The port to listen on .")
parser.add_argument("--ip",
default='127.0.0.1',
help="The IP to listen on ")
parser.add_argument("--port", default=9099, type=int, help="The port to listen on .")
parser.add_argument("--ip", default="127.0.0.1", help="The IP to listen on ")
CONFIG = None
LOG = logging.getLogger('ARPSSERVER')
LOG = logging.getLogger("ARPSSERVER")
# Setup the logging faciility
@ -36,22 +30,19 @@ LOG = logging.getLogger('ARPSSERVER')
def setup_logging(args):
global LOG
levels = {
'CRITICAL': logging.CRITICAL,
'ERROR': logging.ERROR,
'WARNING': logging.WARNING,
'INFO': logging.INFO,
'DEBUG': logging.DEBUG}
"CRITICAL": logging.CRITICAL,
"ERROR": logging.ERROR,
"WARNING": logging.WARNING,
"INFO": logging.INFO,
"DEBUG": logging.DEBUG,
}
log_level = levels[args.loglevel]
LOG.setLevel(log_level)
log_format = ("%(asctime)s [%(threadName)-12.12s] [%(levelname)-5.5s]"
" %(message)s")
date_format = '%m/%d/%Y %I:%M:%S %p'
log_formatter = logging.Formatter(fmt=log_format,
datefmt=date_format)
fh = RotatingFileHandler('aprs-server.log',
maxBytes=(10248576 * 5),
backupCount=4)
log_format = "%(asctime)s [%(threadName)-12.12s] [%(levelname)-5.5s]" " %(message)s"
date_format = "%m/%d/%Y %I:%M:%S %p"
log_formatter = logging.Formatter(fmt=log_format, datefmt=date_format)
fh = RotatingFileHandler("aprs-server.log", maxBytes=(10248576 * 5), backupCount=4)
fh.setFormatter(log_formatter)
LOG.addHandler(fh)
@ -62,7 +53,6 @@ def setup_logging(args):
class MyAPRSTCPHandler(socketserver.BaseRequestHandler):
def handle(self):
# self.request is the TCP socket connected to the client
self.data = self.request.recv(1024).strip()
@ -81,8 +71,8 @@ def main():
CONFIG = utils.parse_config(args)
ip = CONFIG['aprs']['host']
port = CONFIG['aprs']['port']
ip = CONFIG["aprs"]["host"]
port = CONFIG["aprs"]["port"]
LOG.info("Start server listening on %s:%s" % (args.ip, args.port))
with socketserver.TCPServer((ip, port), MyAPRSTCPHandler) as server:

View File

@ -19,37 +19,49 @@ import time
def fuzzy(hour, minute, degree=1):
'''Implements the fuzzy clock.
"""Implements the fuzzy clock.
returns the the string that spells out the time - hour:minute
Supports two degrees of fuzziness. Set with degree = 1 or degree = 2
When degree = 1, time is in quantum of 5 minutes.
When degree = 2, time is in quantum of 15 minutes.'''
When degree = 2, time is in quantum of 15 minutes."""
if degree <= 0 or degree > 2:
print('Please use a degree of 1 or 2. Using fuzziness degree=1')
print("Please use a degree of 1 or 2. Using fuzziness degree=1")
degree = 1
begin = 'It\'s '
begin = "It's "
f0 = 'almost '
f1 = 'exactly '
f2 = 'around '
f0 = "almost "
f1 = "exactly "
f2 = "around "
b0 = ' past '
b1 = ' to '
b0 = " past "
b1 = " to "
hourList = ('One', 'Two', 'Three', 'Four', 'Five', 'Six', 'Seven', 'Eight',
'Nine', 'Ten', 'Eleven', 'Twelve')
hourlist = (
"One",
"Two",
"Three",
"Four",
"Five",
"Six",
"Seven",
"Eight",
"Nine",
"Ten",
"Eleven",
"Twelve",
)
s1 = s2 = s3 = s4 = ''
s1 = s2 = s3 = s4 = ""
base = 5
if degree == 1:
base = 5
val = ('Five', 'Ten', 'Quarter', 'Twenty', 'Twenty-Five', 'Half')
val = ("Five", "Ten", "Quarter", "Twenty", "Twenty-Five", "Half")
elif degree == 2:
base = 15
val = ('Quarter', 'Half')
val = ("Quarter", "Half")
# to find whether we have to use 'almost', 'exactly' or 'around'
dmin = minute % base
@ -74,20 +86,20 @@ def fuzzy(hour, minute, degree=1):
if minute <= base / 2:
# Case like "It's around/exactly Ten"
s2 = s3 = ''
s4 = hourList[hour - 12 - 1]
s2 = s3 = ""
s4 = hourlist[hour - 12 - 1]
elif minute >= 60 - base / 2:
# Case like "It's almost Ten"
s2 = s3 = ''
s4 = hourList[hour - 12]
s2 = s3 = ""
s4 = hourlist[hour - 12]
else:
# Other cases with all words, like "It's around Quarter past One"
if minute > 30:
s3 = b1 # to
s4 = hourList[hour - 12]
s4 = hourlist[hour - 12]
else:
s3 = b0 # past
s4 = hourList[hour - 12 - 1]
s4 = hourlist[hour - 12 - 1]
return begin + s1 + s2 + s3 + s4
@ -102,17 +114,17 @@ def main():
try:
deg = int(sys.argv[1])
except Exception:
print('Please use a degree of 1 or 2. Using fuzziness degree=1')
print("Please use a degree of 1 or 2. Using fuzziness degree=1")
if len(sys.argv) >= 3:
tm = sys.argv[2].split(':')
tm = sys.argv[2].split(":")
try:
h = int(tm[0])
m = int(tm[1])
if h < 0 or h > 23 or m < 0 or m > 59:
raise Exception
except Exception:
print('Bad time entered. Using the system time.')
print("Bad time entered. Using the system time.")
h = stm.tm_hour
m = stm.tm_min
print(fuzzy(h, m, deg))

View File

@ -23,38 +23,36 @@
# python included libs
import datetime
import email
import imaplib
import json
import logging
import os
import socket
import pprint
import re
import signal
import six
import smtplib
import socket
import subprocess
import sys
# import telnetlib
import threading
import time
#import urllib
import requests
from email.mime.text import MIMEText
from logging.handlers import RotatingFileHandler
import click
import click_completion
from email.mime.text import MIMEText
import imapclient
import imaplib
from logging.handlers import RotatingFileHandler
import requests
import six
import yaml
# local imports here
import aprsd
from aprsd.fuzzyclock import fuzzy
from aprsd import utils
from aprsd.fuzzyclock import fuzzy
# setup the global logger
LOG = logging.getLogger('APRSD')
LOG = logging.getLogger("APRSD")
# global for the config yaml
CONFIG = None
@ -100,7 +98,7 @@ message_number = 0
def custom_startswith(string, incomplete):
"""A custom completion match that supports case insensitive matching."""
if os.environ.get('_CLICK_COMPLETION_COMMAND_CASE_INSENSITIVE_COMPLETE'):
if os.environ.get("_CLICK_COMPLETION_COMMAND_CASE_INSENSITIVE_COMPLETE"):
string = string.lower()
incomplete = incomplete.lower()
return string.startswith(incomplete)
@ -115,8 +113,11 @@ Available shell types:
\b
%s
Default type: auto
""" % "\n ".join('{:<12} {}'.format(k, click_completion.core.shells[k]) for k in sorted(
click_completion.core.shells.keys()))
""" % "\n ".join(
"{:<12} {}".format(k, click_completion.core.shells[k])
for k in sorted(click_completion.core.shells.keys())
)
@click.group(help=cmd_help)
@click.version_option()
@ -125,24 +126,48 @@ def main():
@main.command()
@click.option('-i', '--case-insensitive/--no-case-insensitive', help="Case insensitive completion")
@click.argument('shell', required=False, type=click_completion.DocumentedChoice(click_completion.core.shells))
@click.option(
"-i", "--case-insensitive/--no-case-insensitive", help="Case insensitive completion"
)
@click.argument(
"shell",
required=False,
type=click_completion.DocumentedChoice(click_completion.core.shells),
)
def show(shell, case_insensitive):
"""Show the click-completion-command completion code"""
extra_env = {'_CLICK_COMPLETION_COMMAND_CASE_INSENSITIVE_COMPLETE': 'ON'} if case_insensitive else {}
extra_env = (
{"_CLICK_COMPLETION_COMMAND_CASE_INSENSITIVE_COMPLETE": "ON"}
if case_insensitive
else {}
)
click.echo(click_completion.core.get_code(shell, extra_env=extra_env))
@main.command()
@click.option('--append/--overwrite', help="Append the completion code to the file", default=None)
@click.option('-i', '--case-insensitive/--no-case-insensitive', help="Case insensitive completion")
@click.argument('shell', required=False, type=click_completion.DocumentedChoice(click_completion.core.shells))
@click.argument('path', required=False)
@click.option(
"--append/--overwrite", help="Append the completion code to the file", default=None
)
@click.option(
"-i", "--case-insensitive/--no-case-insensitive", help="Case insensitive completion"
)
@click.argument(
"shell",
required=False,
type=click_completion.DocumentedChoice(click_completion.core.shells),
)
@click.argument("path", required=False)
def install(append, case_insensitive, shell, path):
"""Install the click-completion-command completion"""
extra_env = {'_CLICK_COMPLETION_COMMAND_CASE_INSENSITIVE_COMPLETE': 'ON'} if case_insensitive else {}
shell, path = click_completion.core.install(shell=shell, path=path, append=append, extra_env=extra_env)
click.echo('%s completion installed in %s' % (shell, path))
extra_env = (
{"_CLICK_COMPLETION_COMMAND_CASE_INSENSITIVE_COMPLETE": "ON"}
if case_insensitive
else {}
)
shell, path = click_completion.core.install(
shell=shell, path=path, append=append, extra_env=extra_env
)
click.echo("%s completion installed in %s" % (shell, path))
def setup_connection():
@ -153,10 +178,10 @@ def setup_connection():
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(300)
sock.connect((CONFIG['aprs']['host'], 14580))
sock.connect((CONFIG["aprs"]["host"], 14580))
connected = True
LOG.debug("Connected to server: " + CONFIG['aprs']['host'])
sock_file = sock.makefile(mode='r')
LOG.debug("Connected to server: " + CONFIG["aprs"]["host"])
sock_file = sock.makefile(mode="r")
# sock_file = sock.makefile(mode='r', encoding=None, errors=None, newline=None)
# sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) # disable nagle algorithm
# sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 512) # buffer size
@ -166,10 +191,10 @@ def setup_connection():
time.sleep(5)
continue
# os._exit(1)
user = CONFIG['aprs']['login']
password = CONFIG['aprs']['password']
user = CONFIG["aprs"]["login"]
password = CONFIG["aprs"]["password"]
LOG.debug("Logging in to APRS-IS with user '%s'" % user)
msg = ("user {} pass {} vers aprsd {}\n".format(user, password, aprsd.__version__))
msg = "user {} pass {} vers aprsd {}\n".format(user, password, aprsd.__version__)
sock.send(msg.encode())
@ -177,11 +202,13 @@ def signal_handler(signal, frame):
LOG.info("Ctrl+C, exiting.")
# sys.exit(0) # thread ignores this
os._exit(0)
# end signal_handler
def parse_email(msgid, data, server):
envelope = data[b'ENVELOPE']
envelope = data[b"ENVELOPE"]
# email address match
# use raw string to avoid invalid escape secquence errors r"string here"
f = re.search(r"([\.\w_-]+@[\.\w_-]+)", str(envelope.from_[0]))
@ -190,15 +217,21 @@ def parse_email(msgid, data, server):
else:
from_addr = "noaddr"
LOG.debug("Got a message from '{}'".format(from_addr))
m = server.fetch([msgid], ['RFC822'])
msg = email.message_from_string(m[msgid][b'RFC822'].decode(errors='ignore'))
m = server.fetch([msgid], ["RFC822"])
msg = email.message_from_string(m[msgid][b"RFC822"].decode(errors="ignore"))
if msg.is_multipart():
text = ""
html = None
# default in case body somehow isn't set below - happened once
body = "* unreadable msg received"
for part in msg.get_payload(): # FIXME this uses the last text or html part in the email, want the first, reverse order somehow?
if part.get_content_charset() is None: # or BREAK when we hit a text or html?
for (
part
) in (
msg.get_payload()
): # FIXME this uses the last text or html part in the email, want the first, reverse order somehow?
if (
part.get_content_charset() is None
): # or BREAK when we hit a text or html?
# We cannot know the character set,
# so return decoded "something"
text = part.get_payload(decode=True)
@ -206,16 +239,15 @@ def parse_email(msgid, data, server):
charset = part.get_content_charset()
if part.get_content_type() == 'text/plain':
if part.get_content_type() == "text/plain":
text = six.text_type(
part.get_payload(decode=True), str(charset),
"ignore").encode('utf8', 'replace')
part.get_payload(decode=True), str(charset), "ignore"
).encode("utf8", "replace")
if part.get_content_type() == 'text/html':
if part.get_content_type() == "text/html":
html = six.text_type(
part.get_payload(decode=True),
str(charset),
"ignore").encode('utf8', 'replace')
part.get_payload(decode=True), str(charset), "ignore"
).encode("utf8", "replace")
if text is not None:
# strip removes white space fore and aft of string
@ -226,49 +258,46 @@ def parse_email(msgid, data, server):
# email.uscc.net sends no charset, blows up unicode function below
if msg.get_content_charset() is None:
text = six.text_type(
msg.get_payload(decode=True),
'US-ASCII',
'ignore').encode('utf8', 'replace')
msg.get_payload(decode=True), "US-ASCII", "ignore"
).encode("utf8", "replace")
else:
text = six.text_type(
msg.get_payload(decode=True),
msg.get_content_charset(),
'ignore').encode('utf8', 'replace')
msg.get_payload(decode=True), msg.get_content_charset(), "ignore"
).encode("utf8", "replace")
body = text.strip()
# FIXED: UnicodeDecodeError: 'ascii' codec can't decode byte 0xf0 in position 6: ordinal not in range(128)
# decode with errors='ignore'. be sure to encode it before we return it below, also with errors='ignore'
try:
body = body.decode(errors='ignore')
body = body.decode(errors="ignore")
except Exception as e:
LOG.error("Unicode decode failure: " + str(e))
LOG.error("Unidoce decode failed: " + str(body))
body = "Unreadable unicode msg"
# strip all html tags
body = re.sub('<[^<]+?>', '', body)
body = re.sub("<[^<]+?>", "", body)
# strip CR/LF, make it one line, .rstrip fails at this
body = body.replace("\n", " ").replace("\r", " ")
# ascii might be out of range, so encode it, removing any error characters
body = body.encode(errors='ignore')
body = body.encode(errors="ignore")
return (body, from_addr)
# end parse_email
def _imap_connect():
imap_port = CONFIG['imap'].get('port', 143)
use_ssl = CONFIG['imap'].get('use_ssl', False)
host = CONFIG['imap']['host']
msg = ("{}{}:{}".format(
'TLS ' if use_ssl else '',
host,
imap_port
))
imap_port = CONFIG["imap"].get("port", 143)
use_ssl = CONFIG["imap"].get("use_ssl", False)
host = CONFIG["imap"]["host"]
msg = "{}{}:{}".format("TLS " if use_ssl else "", host, imap_port)
# LOG.debug("Connect to IMAP host {} with user '{}'".
# format(msg, CONFIG['imap']['login']))
try:
server = imapclient.IMAPClient(CONFIG['imap']['host'], port=imap_port,
use_uid=True, ssl=use_ssl)
server = imapclient.IMAPClient(
CONFIG["imap"]["host"], port=imap_port, use_uid=True, ssl=use_ssl
)
except Exception:
LOG.error("Failed to connect IMAP server")
return
@ -276,28 +305,25 @@ def _imap_connect():
# LOG.debug("Connected to IMAP host {}".format(msg))
try:
server.login(CONFIG['imap']['login'], CONFIG['imap']['password'])
server.login(CONFIG["imap"]["login"], CONFIG["imap"]["password"])
except (imaplib.IMAP4.error, Exception) as e:
msg = getattr(e, 'message', repr(e))
msg = getattr(e, "message", repr(e))
LOG.error("Failed to login {}".format(msg))
return
# LOG.debug("Logged in to IMAP, selecting INBOX")
server.select_folder('INBOX')
server.select_folder("INBOX")
return server
def _smtp_connect():
host = CONFIG['smtp']['host']
smtp_port = CONFIG['smtp']['port']
use_ssl = CONFIG['smtp'].get('use_ssl', False)
msg = ("{}{}:{}".format(
'SSL ' if use_ssl else '',
host,
smtp_port
))
LOG.debug("Connect to SMTP host {} with user '{}'".
format(msg, CONFIG['imap']['login']))
host = CONFIG["smtp"]["host"]
smtp_port = CONFIG["smtp"]["port"]
use_ssl = CONFIG["smtp"].get("use_ssl", False)
msg = "{}{}:{}".format("SSL " if use_ssl else "", host, smtp_port)
LOG.debug(
"Connect to SMTP host {} with user '{}'".format(msg, CONFIG["imap"]["login"])
)
try:
if use_ssl:
@ -311,7 +337,7 @@ def _smtp_connect():
LOG.debug("Connected to smtp host {}".format(msg))
try:
server.login(CONFIG['smtp']['login'], CONFIG['smtp']['password'])
server.login(CONFIG["smtp"]["login"], CONFIG["smtp"]["password"])
except Exception:
LOG.error("Couldn't connect to SMTP Server")
return
@ -344,7 +370,7 @@ def resend_email(count, fromcall):
year = date.year
today = "%s-%s-%s" % (day, month, year)
shortcuts = CONFIG['shortcuts']
shortcuts = CONFIG["shortcuts"]
# swap key/value
shortcuts_inverted = dict([[v, k] for k, v in shortcuts.items()])
@ -354,7 +380,7 @@ def resend_email(count, fromcall):
LOG.exception("Failed to Connect to IMAP. Cannot resend email ", e)
return
messages = server.search(['SINCE', today])
messages = server.search(["SINCE", today])
# LOG.debug("%d messages received today" % len(messages))
msgexists = False
@ -362,7 +388,7 @@ def resend_email(count, fromcall):
messages.sort(reverse=True)
del messages[int(count) :] # only the latest "count" messages
for message in messages:
for msgid, data in list(server.fetch(message, ['ENVELOPE']).items()):
for msgid, data in list(server.fetch(message, ["ENVELOPE"]).items()):
# one at a time, otherwise order is random
(body, from_addr) = parse_email(msgid, data, server)
# unset seen flag, will stay bold in email client
@ -384,9 +410,11 @@ def resend_email(count, fromcall):
# thinking this is a duplicate message.
# The FT1XDR pretty much ignores the aprs message number in this
# regard. The FTM400 gets it right.
reply = "No new msg %s:%s:%s" % (str(h).zfill(2),
reply = "No new msg %s:%s:%s" % (
str(h).zfill(2),
str(m).zfill(2),
str(s).zfill(2))
str(s).zfill(2),
)
send_message(fromcall, reply)
# check email more often since we're resending one now
@ -412,7 +440,7 @@ def check_email_thread():
check_email_delay += 1
LOG.debug("check_email_delay is " + str(check_email_delay) + " seconds")
shortcuts = CONFIG['shortcuts']
shortcuts = CONFIG["shortcuts"]
# swap key/value
shortcuts_inverted = dict([[v, k] for k, v in shortcuts.items()])
@ -431,14 +459,15 @@ def check_email_thread():
if not server:
continue
messages = server.search(['SINCE', today])
messages = server.search(["SINCE", today])
# LOG.debug("{} messages received today".format(len(messages)))
for msgid, data in server.fetch(messages, ['ENVELOPE']).items():
envelope = data[b'ENVELOPE']
for msgid, data in server.fetch(messages, ["ENVELOPE"]).items():
envelope = data[b"ENVELOPE"]
# LOG.debug('ID:%d "%s" (%s)' % (msgid, envelope.subject.decode(), envelope.date))
f = re.search(r"'([[A-a][0-9]_-]+@[[A-a][0-9]_-\.]+)",
str(envelope.from_[0]))
f = re.search(
r"'([[A-a][0-9]_-]+@[[A-a][0-9]_-\.]+)", str(envelope.from_[0])
)
if f is not None:
from_addr = f.group(1)
else:
@ -447,10 +476,12 @@ def check_email_thread():
# LOG.debug("Message flags/tags: " + str(server.get_flags(msgid)[msgid]))
# if "APRS" not in server.get_flags(msgid)[msgid]:
# in python3, imap tags are unicode. in py2 they're strings. so .decode them to handle both
taglist=[x.decode(errors='ignore') for x in server.get_flags(msgid)[msgid]]
taglist = [
x.decode(errors="ignore") for x in server.get_flags(msgid)[msgid]
]
if "APRS" not in taglist:
# if msg not flagged as sent via aprs
server.fetch([msgid], ['RFC822'])
server.fetch([msgid], ["RFC822"])
(body, from_addr) = parse_email(msgid, data, server)
# unset seen flag, will stay bold in email client
server.remove_flags(msgid, [imapclient.SEEN])
@ -459,10 +490,10 @@ def check_email_thread():
# reverse lookup of a shortcut
from_addr = shortcuts_inverted[from_addr]
reply = "-" + from_addr + " " + body.decode(errors='ignore')
send_message(CONFIG['ham']['callsign'], reply)
reply = "-" + from_addr + " " + body.decode(errors="ignore")
send_message(CONFIG["ham"]["callsign"], reply)
# flag message as sent via aprs
server.add_flags(msgid, ['APRS'])
server.add_flags(msgid, ["APRS"])
# unset seen flag, will stay bold in email client
server.remove_flags(msgid, [imapclient.SEEN])
# check email more often since we just received an email
@ -470,16 +501,16 @@ def check_email_thread():
server.logout()
# end check_email()
def send_ack_thread(tocall, ack, retry_count):
tocall = tocall.ljust(9) # pad to nine chars
line = ("{}>APRS::{}:ack{}\n".format(
CONFIG['aprs']['login'], tocall, ack))
line = "{}>APRS::{}:ack{}\n".format(CONFIG["aprs"]["login"], tocall, ack)
for i in range(retry_count, 0, -1):
LOG.info("Sending ack __________________ Tx({})".format(i))
LOG.info("Raw : {}".format(line.rstrip('\n')))
LOG.info("Raw : {}".format(line.rstrip("\n")))
LOG.info("To : {}".format(tocall))
LOG.info("Ack number : {}".format(ack))
sock.send(line.encode())
@ -492,9 +523,9 @@ def send_ack_thread(tocall, ack, retry_count):
def send_ack(tocall, ack):
retry_count = 3
thread = threading.Thread(target=send_ack_thread,
name="send_ack",
args=(tocall, ack, retry_count))
thread = threading.Thread(
target=send_ack_thread, name="send_ack", args=(tocall, ack, retry_count)
)
thread.start()
return ()
# end send_ack()
@ -505,17 +536,22 @@ def send_message_thread(tocall, message, this_message_number, retry_count):
# line = (CONFIG['aprs']['login'] + ">APRS::" + tocall + ":" + message
# + "{" + str(this_message_number) + "\n")
# line = ("{}>APRS::{}:{}{{{}\n".format( CONFIG['aprs']['login'], tocall, message.encode(errors='ignore'), str(this_message_number),))
line = ("{}>APRS::{}:{}{{{}\n".format( CONFIG['aprs']['login'], tocall, message, str(this_message_number),))
line = "{}>APRS::{}:{}{{{}\n".format(
CONFIG["aprs"]["login"],
tocall,
message,
str(this_message_number),
)
for i in range(retry_count, 0, -1):
LOG.debug("DEBUG: send_message_thread msg:ack combos are: ")
LOG.debug(pprint.pformat(ack_dict))
if ack_dict[this_message_number] != 1:
LOG.info("Sending message_______________ {}(Tx{})"
.format(
str(this_message_number),
str(i)
))
LOG.info("Raw : {}".format(line.rstrip('\n')))
LOG.info(
"Sending message_______________ {}(Tx{})".format(
str(this_message_number), str(i)
)
)
LOG.info("Raw : {}".format(line.rstrip("\n")))
LOG.info("To : {}".format(tocall))
# LOG.info("Message : {}".format(message.encode(errors='ignore')))
LOG.info("Message : {}".format(message))
@ -539,12 +575,14 @@ def send_message(tocall, message):
message_number += 1
if len(ack_dict) > 90:
# empty ack dict if it's really big, could result in key error later
LOG.debug("DEBUG: Length of ack dictionary is big at %s clearing." %
len(ack_dict))
LOG.debug(
"DEBUG: Length of ack dictionary is big at %s clearing." % len(ack_dict)
)
ack_dict.clear()
LOG.debug(pprint.pformat(ack_dict))
LOG.debug("DEBUG: Cleared ack dictionary, ack_dict length is now %s." %
len(ack_dict))
LOG.debug(
"DEBUG: Cleared ack dictionary, ack_dict length is now %s." % len(ack_dict)
)
ack_dict[message_number] = 0 # clear ack for this message number
tocall = tocall.ljust(9) # pad to nine chars
@ -554,25 +592,26 @@ def send_message(tocall, message):
# feature req: break long ones into two msgs
message = message[:67]
# We all miss George Carlin
message = re.sub('fuck|shit|cunt|piss|cock|bitch', '****', message)
message = re.sub("fuck|shit|cunt|piss|cock|bitch", "****", message)
thread = threading.Thread(
target=send_message_thread,
name="send_message",
args=(tocall, message, message_number, retry_count))
args=(tocall, message, message_number, retry_count),
)
thread.start()
return ()
# end send_message()
def process_message(line):
f = re.search('^(.*)>', line)
f = re.search("^(.*)>", line)
fromcall = f.group(1)
searchstring = '::%s[ ]*:(.*)' % CONFIG['aprs']['login']
searchstring = "::%s[ ]*:(.*)" % CONFIG["aprs"]["login"]
# verify this, callsign is padded out with spaces to colon
m = re.search(searchstring, line)
fullmessage = m.group(1)
ack_attached = re.search('(.*){([0-9A-Z]+)', fullmessage)
ack_attached = re.search("(.*){([0-9A-Z]+)", fullmessage)
# ack formats include: {1, {AB}, {12
if ack_attached:
# "{##" suffix means radio wants an ack back
@ -599,12 +638,12 @@ def send_email(to_addr, content):
global check_email_delay
LOG.info("Sending Email_________________")
shortcuts = CONFIG['shortcuts']
shortcuts = CONFIG["shortcuts"]
if to_addr in shortcuts:
LOG.info("To : " + to_addr)
to_addr = shortcuts[to_addr]
LOG.info(" (" + to_addr + ")")
subject = CONFIG['ham']['callsign']
subject = CONFIG["ham"]["callsign"]
# content = content + "\n\n(NOTE: reply with one line)"
LOG.info("Subject : " + subject)
LOG.info("Body : " + content)
@ -613,20 +652,20 @@ def send_email(to_addr, content):
check_email_delay = 60
msg = MIMEText(content)
msg['Subject'] = subject
msg['From'] = CONFIG['smtp']['login']
msg['To'] = to_addr
msg["Subject"] = subject
msg["From"] = CONFIG["smtp"]["login"]
msg["To"] = to_addr
server = _smtp_connect()
if server:
try:
server.sendmail(CONFIG['smtp']['login'], [to_addr], msg.as_string())
server.sendmail(CONFIG["smtp"]["login"], [to_addr], msg.as_string())
except Exception as e:
msg = getattr(e, 'message', repr(e))
msg = getattr(e, "message", repr(e))
LOG.error("Sendmail Error!!!! '{}'", msg)
server.quit()
return(-1)
return -1
server.quit()
return(0)
return 0
# end send_email
@ -634,24 +673,22 @@ def send_email(to_addr, content):
# to disable logging to stdout, but still log to file
# use the --quiet option on the cmdln
def setup_logging(loglevel, quiet):
global LOG
levels = {
'CRITICAL': logging.CRITICAL,
'ERROR': logging.ERROR,
'WARNING': logging.WARNING,
'INFO': logging.INFO,
'DEBUG': logging.DEBUG}
"CRITICAL": logging.CRITICAL,
"ERROR": logging.ERROR,
"WARNING": logging.WARNING,
"INFO": logging.INFO,
"DEBUG": logging.DEBUG,
}
log_level = levels[loglevel]
LOG.setLevel(log_level)
log_format = ("%(asctime)s [%(threadName)-12s] [%(levelname)-5.5s]"
" %(message)s")
date_format = '%m/%d/%Y %I:%M:%S %p'
log_formatter = logging.Formatter(fmt=log_format,
datefmt=date_format)
fh = RotatingFileHandler(CONFIG['aprs']['logfile'],
maxBytes=(10248576 * 5),
backupCount=4)
log_format = "%(asctime)s [%(threadName)-12s] [%(levelname)-5.5s]" " %(message)s"
date_format = "%m/%d/%Y %I:%M:%S %p"
log_formatter = logging.Formatter(fmt=log_format, datefmt=date_format)
fh = RotatingFileHandler(
CONFIG["aprs"]["logfile"], maxBytes=(10248576 * 5), backupCount=4
)
fh.setFormatter(log_formatter)
LOG.addHandler(fh)
@ -661,32 +698,38 @@ def setup_logging(loglevel, quiet):
LOG.addHandler(sh)
@main.command()
def sample_config():
"""This dumps the config to stdout."""
print(utils.example_config)
click.echo(yaml.dump(utils.DEFAULT_CONFIG_DICT))
# main() ###
@main.command()
@click.option("--loglevel",
default='DEBUG',
@click.option(
"--loglevel",
default="DEBUG",
show_default=True,
type=click.Choice(['CRITICAL', 'ERROR', 'WARNING', 'INFO', 'DEBUG'],
case_sensitive=False),
type=click.Choice(
["CRITICAL", "ERROR", "WARNING", "INFO", "DEBUG"], case_sensitive=False
),
show_choices=True,
help="The log level to use for aprsd.log")
@click.option("--quiet",
is_flag=True,
default=False,
help="Don't log to stdout")
def server(loglevel, quiet):
help="The log level to use for aprsd.log",
)
@click.option("--quiet", is_flag=True, default=False, help="Don't log to stdout")
@click.option(
"-c",
"--config",
"config_file",
show_default=True,
default=utils.DEFAULT_CONFIG_FILE,
help="The aprsd config file to use for options.",
)
def server(loglevel, quiet, config_file):
"""Start the aprsd server process."""
global CONFIG
CONFIG = utils.parse_config()
CONFIG = utils.parse_config(config_file)
signal.signal(signal.SIGINT, signal_handler)
setup_logging(loglevel, quiet)
LOG.info("APRSD Started version: {}".format(aprsd.__version__))
@ -698,19 +741,17 @@ def server(loglevel, quiet):
LOG.error("Failed to validate email config options")
sys.exit(-1)
user = CONFIG['aprs']['login']
password = CONFIG['aprs']['password']
user = CONFIG["aprs"]["login"]
# password = CONFIG["aprs"]["password"]
# LOG.debug("LOGIN to APRSD with user '%s'" % user)
# msg = ("user {} pass {} vers aprsd {}\n".format(user, password, aprsd.__version__))
# sock.send(msg.encode())
time.sleep(2)
checkemailthread = threading.Thread(target=check_email_thread,
name="check_email",
args=() ) # args must be tuple
checkemailthread = threading.Thread(
target=check_email_thread, name="check_email", args=()
) # args must be tuple
checkemailthread.start()
LOG.debug("reset empty line counter")
@ -724,7 +765,7 @@ def server(loglevel, quiet):
line = sock_file.readline().strip()
LOG.info("APRS-IS: " + line)
# is aprs message to us? not beacon, status, empty line, etc
searchstring = '::%s' % user
searchstring = "::%s" % user
if re.search(searchstring, line):
LOG.debug("main: found message addressed to us begin process_message")
(fromcall, message, ack) = process_message(line)
@ -734,31 +775,36 @@ def server(loglevel, quiet):
# LOG.debug("Noise: " + line)
# detect closed socket, getting lots of empty lines
if len(line.strip()) == 0:
LOG.debug("Zero line length received. Consecutive empty line count: " + str(empty_line_rx))
LOG.debug(
"Zero line length received. Consecutive empty line count: "
+ str(empty_line_rx)
)
empty_line_rx += 1
if empty_line_rx >= 30:
LOG.debug("Excessive empty lines received, socket likely CLOSED_WAIT. Reconnecting.")
LOG.debug(
"Excessive empty lines received, socket likely CLOSED_WAIT. Reconnecting."
)
empty_line_rx = 0
raise Exception("closed_socket")
continue # line is something we don't care about
# ACK (ack##)
if re.search('^ack[0-9]+', message):
if re.search("^ack[0-9]+", message):
LOG.debug("ACK")
# put message_number:1 in dict to record the ack
a = re.search('^ack([0-9]+)', message)
a = re.search("^ack([0-9]+)", message)
ack_dict.update({int(a.group(1)): 1})
continue # break out of this so we don't ack an ack at the end
# EMAIL (-)
# is email command
elif re.search('^-.*', message):
elif re.search("^-.*", message):
LOG.debug("EMAIL")
searchstring = '^' + CONFIG['ham']['callsign'] + '.*'
searchstring = "^" + CONFIG["ham"]["callsign"] + ".*"
# only I can do email
if re.search(searchstring, fromcall):
# digits only, first one is number of emails to resend
r = re.search('^-([0-9])[0-9]*$', message)
r = re.search("^-([0-9])[0-9]*$", message)
if r is not None:
resend_email(r.group(1), fromcall)
# -user@address.com body of email
@ -769,16 +815,18 @@ def server(loglevel, quiet):
to_addr = a.group(1)
content = a.group(2)
# send recipient link to aprs.fi map
if content == 'mapme':
if content == "mapme":
content = (
"Click for my location: http://aprs.fi/{}".
format(CONFIG['ham']['callsign']))
"Click for my location: http://aprs.fi/{}".format(
CONFIG["ham"]["callsign"]
)
)
too_soon = 0
now = time.time()
# see if we sent this msg number recently
if ack in email_sent_dict:
timedelta = now - email_sent_dict[ack]
if (timedelta < 300): # five minutes
if timedelta < 300: # five minutes
too_soon = 1
if not too_soon or ack == 0:
send_result = send_email(to_addr, content)
@ -786,113 +834,175 @@ def server(loglevel, quiet):
send_message(fromcall, "-" + to_addr + " failed")
else:
# send_message(fromcall, "-" + to_addr + " sent")
if len(email_sent_dict) > 98: # clear email sent dictionary if somehow goes over 100
LOG.debug("DEBUG: email_sent_dict is big (" + str(len(email_sent_dict)) + ") clearing out.")
if (
len(email_sent_dict) > 98
): # clear email sent dictionary if somehow goes over 100
LOG.debug(
"DEBUG: email_sent_dict is big ("
+ str(len(email_sent_dict))
+ ") clearing out."
)
email_sent_dict.clear()
email_sent_dict[ack] = now
else:
LOG.info("Email for message number " + ack + " recently sent, not sending again.")
LOG.info(
"Email for message number "
+ ack
+ " recently sent, not sending again."
)
else:
send_message(fromcall, "Bad email address")
# TIME (t)
elif re.search('^[tT]', message):
elif re.search("^[tT]", message):
LOG.debug("TIME")
stm = time.localtime()
h = stm.tm_hour
m = stm.tm_min
cur_time = fuzzy(h, m, 1)
reply = cur_time + " (" + str(h) + ":" + str(m).rjust(2, '0') + "PDT)" + " (" + message.rstrip() + ")"
thread = threading.Thread(target=send_message,
name="send_message",
args=(fromcall, reply))
reply = (
cur_time
+ " ("
+ str(h)
+ ":"
+ str(m).rjust(2, "0")
+ "PDT)"
+ " ("
+ message.rstrip()
+ ")"
)
thread = threading.Thread(
target=send_message, name="send_message", args=(fromcall, reply)
)
thread.start()
# FORTUNE (f)
elif re.search('^[fF]', message):
elif re.search("^[fF]", message):
LOG.debug("FORTUNE")
process = subprocess.Popen(['/usr/games/fortune', '-s', '-n 60'], stdout=subprocess.PIPE)
process = subprocess.Popen(
["/usr/games/fortune", "-s", "-n 60"], stdout=subprocess.PIPE
)
reply = process.communicate()[0]
# send_message(fromcall, reply.rstrip())
reply = reply.decode(errors='ignore')
reply = reply.decode(errors="ignore")
send_message(fromcall, reply.rstrip())
# PING (p)
elif re.search('^[pP]', message):
elif re.search("^[pP]", message):
LOG.debug("PING")
stm = time.localtime()
h = stm.tm_hour
m = stm.tm_min
s = stm.tm_sec
reply = "Pong! " + str(h).zfill(2) + ":" + str(m).zfill(2) + ":" + str(s).zfill(2)
reply = (
"Pong! "
+ str(h).zfill(2)
+ ":"
+ str(m).zfill(2)
+ ":"
+ str(s).zfill(2)
)
send_message(fromcall, reply.rstrip())
# LOCATION (l) "8 Miles E Auburn CA 1771' 38.91547,-120.99500 0.1h ago"
elif re.search('^[lL]', message):
elif re.search("^[lL]", message):
LOG.debug("LOCATION")
# get last location of a callsign, get descriptive name from weather service
try:
a = re.search(r"^.*\s+(.*)", message) # optional second argument is a callsign to search
a = re.search(
r"^.*\s+(.*)", message
) # optional second argument is a callsign to search
if a is not None:
searchcall = a.group(1)
searchcall = searchcall.upper()
else:
searchcall = fromcall # if no second argument, search for calling station
url = "http://api.aprs.fi/api/get?name=" + searchcall + "&what=loc&apikey=104070.f9lE8qg34L8MZF&format=json"
#response = urllib.urlopen(url)
url = (
"http://api.aprs.fi/api/get?name="
+ searchcall
+ "&what=loc&apikey=104070.f9lE8qg34L8MZF&format=json"
)
response = requests.get(url)
# aprs_data = json.loads(response.read())
aprs_data = json.loads(response.text)
lat = aprs_data['entries'][0]['lat']
lon = aprs_data['entries'][0]['lng']
lat = aprs_data["entries"][0]["lat"]
lon = aprs_data["entries"][0]["lng"]
try: # altitude not always provided
alt = aprs_data['entries'][0]['altitude']
alt = aprs_data["entries"][0]["altitude"]
except Exception:
alt = 0
altfeet = int(alt * 3.28084)
aprs_lasttime_seconds = aprs_data['entries'][0]['lasttime']
aprs_lasttime_seconds = aprs_lasttime_seconds.encode('ascii', errors='ignore') # unicode to ascii
aprs_lasttime_seconds = aprs_data["entries"][0]["lasttime"]
aprs_lasttime_seconds = aprs_lasttime_seconds.encode(
"ascii", errors="ignore"
) # unicode to ascii
delta_seconds = time.time() - int(aprs_lasttime_seconds)
delta_hours = delta_seconds / 60 / 60
url2 = "https://forecast.weather.gov/MapClick.php?lat=" + str(lat) + "&lon=" + str(lon) + "&FcstType=json"
#response2 = urllib.urlopen(url2)
url2 = (
"https://forecast.weather.gov/MapClick.php?lat="
+ str(lat)
+ "&lon="
+ str(lon)
+ "&FcstType=json"
)
response2 = requests.get(url2)
# wx_data = json.loads(response2.read())
wx_data = json.loads(response2.text)
reply = searchcall + ": " + wx_data['location']['areaDescription'] + " " + str(altfeet) + "' " + str(lat) + "," + str(lon) + " " + str("%.1f" % round(delta_hours, 1)) + "h ago"
reply = (
searchcall
+ ": "
+ wx_data["location"]["areaDescription"]
+ " "
+ str(altfeet)
+ "' "
+ str(lat)
+ ","
+ str(lon)
+ " "
+ str("%.1f" % round(delta_hours, 1))
+ "h ago"
)
# reply = reply.encode('ascii', errors='ignore') # unicode to ascii
send_message(fromcall, reply.rstrip())
except Exception as e:
LOG.debug("Locate failed with: " + "%s" % str(e))
reply = "Unable to find station " + searchcall + ". Sending beacons?"
reply = (
"Unable to find station " + searchcall + ". Sending beacons?"
)
send_message(fromcall, reply.rstrip())
# WEATHER (w) "42F(68F/48F) Haze. Tonight, Haze then Chance Rain."
elif re.search('^[wW]', message):
elif re.search("^[wW]", message):
LOG.debug("WEATHER")
# get my last location from aprsis then get weather from
# weather service
try:
url = ("http://api.aprs.fi/api/get?" "&what=loc&apikey=104070.f9lE8qg34L8MZF&format=json" "&name=%s" % fromcall)
#response = urllib.urlopen(url)
url = (
"http://api.aprs.fi/api/get?"
"&what=loc&apikey=104070.f9lE8qg34L8MZF&format=json"
"&name=%s" % fromcall
)
response = requests.get(url)
# aprs_data = json.loads(response.read())
aprs_data = json.loads(response.text)
lat = aprs_data['entries'][0]['lat']
lon = aprs_data['entries'][0]['lng']
url2 = ("https://forecast.weather.gov/MapClick.php?lat=%s" "&lon=%s&FcstType=json" % (lat, lon))
#response2 = urllib.urlopen(url2)
lat = aprs_data["entries"][0]["lat"]
lon = aprs_data["entries"][0]["lng"]
url2 = (
"https://forecast.weather.gov/MapClick.php?lat=%s"
"&lon=%s&FcstType=json" % (lat, lon)
)
response2 = requests.get(url2)
# wx_data = json.loads(response2.read())
wx_data = json.loads(response2.text)
reply = "%sF(%sF/%sF) %s. %s, %s." % (
wx_data['currentobservation']['Temp'],
wx_data['data']['temperature'][0],
wx_data['data']['temperature'][1],
wx_data['data']['weather'][0],
wx_data['time']['startPeriodName'][1],
wx_data['data']['weather'][1])
wx_data["currentobservation"]["Temp"],
wx_data["data"]["temperature"][0],
wx_data["data"]["temperature"][1],
wx_data["data"]["weather"][0],
wx_data["time"]["startPeriodName"][1],
wx_data["data"]["weather"][1],
)
LOG.debug("reply: " + reply.rstrip())
send_message(fromcall, reply.rstrip())
except Exception as e:
@ -915,7 +1025,12 @@ def server(loglevel, quiet):
except Exception as e:
LOG.error("Error in mainline loop:")
LOG.error("%s" % str(e))
if ( str(e) == "closed_socket" or str(e) == "timed out" or str(e) == "Temporary failure in name resolution" or str(e) == "Network is unreachable"):
if (
str(e) == "closed_socket"
or str(e) == "timed out"
or str(e) == "Temporary failure in name resolution"
or str(e) == "Network is unreachable"
):
LOG.error("Attempting to reconnect.")
sock_file.close()
sock.shutdown(0)

View File

@ -1,40 +1,44 @@
"""Utilities and helper functions."""
import logging
import errno
import os
import sys
import click
import yaml
# an example of what should be in the ~/.aprsd/config.yml
example_config = '''
ham:
callsign: KFART
DEFAULT_CONFIG_DICT = {
"ham": {"callsign": "KFART"},
"aprs": {
"login": "someusername",
"password": "somepassword",
"host": "noam.aprs2.net",
"port": 14580,
"logfile": "/tmp/arsd.log",
},
"shortcuts": {
"aa": "5551239999@vtext.com",
"cl": "craiglamparter@somedomain.org",
"wb": "555309@vtext.com",
},
"smtp": {
"login": "something",
"password": "some lame password",
"host": "imap.gmail.com",
"port": 465,
"use_ssl": False,
},
"imap": {
"login": "imapuser",
"password": "something here too",
"host": "imap.gmail.com",
"port": 993,
"use_ssl": True,
},
}
aprs:
login: someusername
password: password
host: noam.aprs2.net
port: 14580
logfile: /tmp/aprsd.log
shortcuts:
'aa': '5551239999@vtext.com'
'cl': 'craiglamparter@somedomain.org'
'wb': '555309@vtext.com'
smtp:
login: something
password: some lame password
host: imap.gmail.com
port: 465
imap:
login: imapuser
password: something dumb
host: imap.gmail.com
'''
log = logging.getLogger('APRSD')
DEFAULT_CONFIG_FILE = "~/.config/aprsd/aprsd.yml"
def env(*vars, **kwargs):
@ -45,20 +49,56 @@ def env(*vars, **kwargs):
value = os.environ.get(v, None)
if value:
return value
return kwargs.get('default', '')
return kwargs.get("default", "")
def get_config():
"""This tries to read the yaml config from ~/.aprsd/config.yml."""
config_file = os.path.expanduser("~/.aprsd/config.yml")
if os.path.exists(config_file):
with open(config_file, "r") as stream:
def mkdir_p(path):
"""Make directory and have it work in py2 and py3."""
try:
os.makedirs(path)
except OSError as exc: # Python >= 2.5
if exc.errno == errno.EEXIST and os.path.isdir(path):
pass
else:
raise
def create_default_config():
"""Create a default config file."""
# make sure the directory location exists
config_file_expanded = os.path.expanduser(DEFAULT_CONFIG_FILE)
config_dir = os.path.dirname(config_file_expanded)
if not os.path.exists(config_dir):
click.echo("Config dir '{}' doesn't exist, creating.".format(config_dir))
mkdir_p(config_dir)
with open(config_file_expanded, "w+") as cf:
yaml.dump(DEFAULT_CONFIG_DICT, cf)
def get_config(config_file):
"""This tries to read the yaml config from <config_file>."""
config_file_expanded = os.path.expanduser(config_file)
if os.path.exists(config_file_expanded):
with open(config_file_expanded, "r") as stream:
config = yaml.load(stream, Loader=yaml.FullLoader)
return config
else:
log.critical("%s is missing, please create config file" % config_file)
print("\nCopy to ~/.aprsd/config.yml and edit\n\nSample config:\n %s"
% example_config)
if config_file == DEFAULT_CONFIG_FILE:
click.echo(
"{} is missing, creating config file".format(config_file_expanded)
)
create_default_config()
msg = (
"Default config file created at {}. Please edit with your "
"settings.".format(config_file)
)
click.echo(msg)
else:
# The user provided a config file path different from the
# Default, so we won't try and create it, just bitch and bail.
msg = "Custom config file '{}' is missing.".format(config_file)
click.echo(msg)
sys.exit(-1)
@ -66,42 +106,55 @@ def get_config():
# and consume the settings.
# If the required params don't exist,
# it will look in the environment
def parse_config():
def parse_config(config_file):
# for now we still use globals....ugh
global CONFIG, LOG
global CONFIG
def fail(msg):
LOG.critical(msg)
click.echo(msg)
sys.exit(-1)
def check_option(config, section, name=None, default=None):
def check_option(config, section, name=None, default=None, default_fail=None):
if section in config:
if name and name not in config[section]:
if not default:
fail("'%s' was not in '%s' section of config file" %
(name, section))
fail(
"'%s' was not in '%s' section of config file" % (name, section)
)
else:
config[section][name] = default
else:
if (
default_fail
and name in config[section]
and config[section][name] == default_fail
):
# We have to fail and bail if the user hasn't edited
# this config option.
fail("Config file needs to be edited from provided defaults.")
else:
fail("'%s' section wasn't in config file" % section)
return config
# Now read the ~/.aprds/config.yml
config = get_config()
check_option(config, 'shortcuts')
check_option(config, 'ham', 'callsign')
check_option(config, 'aprs', 'login')
check_option(config, 'aprs', 'password')
check_option(config, 'aprs', 'host')
check_option(config, 'aprs', 'port')
config = check_option(config, 'aprs', 'logfile', './aprsd.log')
check_option(config, 'imap', 'host')
check_option(config, 'imap', 'login')
check_option(config, 'imap', 'password')
check_option(config, 'smtp', 'host')
check_option(config, 'smtp', 'port')
check_option(config, 'smtp', 'login')
check_option(config, 'smtp', 'password')
config = get_config(config_file)
check_option(config, "shortcuts")
# special check here to make sure user has edited the config file
# and changed the ham callsign
check_option(
config, "ham", "callsign", default_fail=DEFAULT_CONFIG_DICT["ham"]["callsign"]
)
check_option(config, "aprs", "login")
check_option(config, "aprs", "password")
check_option(config, "aprs", "host")
check_option(config, "aprs", "port")
check_option(config, "aprs", "logfile", "./aprsd.log")
check_option(config, "imap", "host")
check_option(config, "imap", "login")
check_option(config, "imap", "password")
check_option(config, "smtp", "host")
check_option(config, "smtp", "port")
check_option(config, "smtp", "login")
check_option(config, "smtp", "password")
return config
LOG.info("aprsd config loaded")

9
dev-requirements.in Normal file
View File

@ -0,0 +1,9 @@
tox
pytest
pytest-cov
mypy
flake8
pep8-naming
black
isort
Sphinx

60
dev-requirements.txt Normal file
View File

@ -0,0 +1,60 @@
#
# This file is autogenerated by pip-compile
# To update, run:
#
# pip-compile dev-requirements.in
#
alabaster==0.7.12 # via sphinx
appdirs==1.4.4 # via black, virtualenv
attrs==20.3.0 # via pytest
babel==2.9.0 # via sphinx
black==20.8b1 # via -r dev-requirements.in
certifi==2020.12.5 # via requests
chardet==3.0.4 # via requests
coverage==5.3 # via pytest-cov
distlib==0.3.1 # via virtualenv
docutils==0.16 # via sphinx
filelock==3.0.12 # via tox, virtualenv
flake8-polyfill==1.0.2 # via pep8-naming
flake8==3.8.4 # via -r dev-requirements.in, flake8-polyfill
idna==2.10 # via requests
imagesize==1.2.0 # via sphinx
iniconfig==1.1.1 # via pytest
isort==5.6.4 # via -r dev-requirements.in
jinja2==2.11.2 # via sphinx
markupsafe==1.1.1 # via jinja2
mccabe==0.6.1 # via flake8
mypy-extensions==0.4.3 # via black, mypy
mypy==0.790 # via -r dev-requirements.in
packaging==20.7 # via pytest, sphinx, tox
pathspec==0.8.1 # via black
pep8-naming==0.11.1 # via -r dev-requirements.in
pluggy==0.13.1 # via pytest, tox
py==1.9.0 # via pytest, tox
pycodestyle==2.6.0 # via flake8
pyflakes==2.2.0 # via flake8
pygments==2.7.3 # via sphinx
pyparsing==2.4.7 # via packaging
pytest-cov==2.10.1 # via -r dev-requirements.in
pytest==6.1.2 # via -r dev-requirements.in, pytest-cov
pytz==2020.4 # via babel
regex==2020.11.13 # via black
requests==2.25.0 # via sphinx
six==1.15.0 # via tox, virtualenv
snowballstemmer==2.0.0 # via sphinx
sphinx==3.3.1 # via -r dev-requirements.in
sphinxcontrib-applehelp==1.0.2 # via sphinx
sphinxcontrib-devhelp==1.0.2 # via sphinx
sphinxcontrib-htmlhelp==1.0.3 # via sphinx
sphinxcontrib-jsmath==1.0.1 # via sphinx
sphinxcontrib-qthelp==1.0.3 # via sphinx
sphinxcontrib-serializinghtml==1.1.4 # via sphinx
toml==0.10.2 # via black, pytest, tox
tox==3.20.1 # via -r dev-requirements.in
typed-ast==1.4.1 # via black, mypy
typing-extensions==3.7.4.3 # via black, mypy
urllib3==1.26.2 # via requests
virtualenv==20.2.2 # via tox
# The following packages are considered to be unsafe in a requirements file:
# setuptools

View File

@ -4,3 +4,4 @@ imapclient
pbr
pyyaml
six
requests

View File

@ -24,6 +24,4 @@ try:
except ImportError:
pass
setuptools.setup(
setup_requires=['pbr'],
pbr=True)
setuptools.setup(setup_requires=["pbr"], pbr=True)

View File

@ -0,0 +1,3 @@
flake8
pytest
mock

0
tests/__init__.py Normal file
View File

23
tests/test_main.py Normal file
View File

@ -0,0 +1,23 @@
# -*- coding: utf-8 -*-
import sys
import unittest
import pytest
from aprsd import main
if sys.version_info >= (3, 2):
from unittest import mock
else:
import mock
class testMain(unittest.TestCase):
@mock.patch("aprsd.main._imap_connect")
@mock.patch("aprsd.main._smtp_connect")
def test_validate_email(self, imap_mock, smtp_mock):
"""Test to make sure we fail."""
imap_mock.return_value = None
smtp_mock.return_value = {"smaiof": "fire"}
main.validate_email()

89
tox.ini
View File

@ -1,26 +1,51 @@
[tox]
minversion = 1.6
minversion = 2.9.0
skipdist = True
envlist = py27,py36,py37,fast8,pep8,cover,docs
skip_missing_interpreters = true
envlist = py{27,36,37,38},pep8,fmt-check
# Activate isolated build environment. tox will use a virtual environment
# to build a source distribution from the source tree. For build tools and
# arguments use the pyproject.toml file as specified in PEP-517 and PEP-518.
isolated_build = true
[testenv]
setenv = VIRTUAL_ENV={envdir}
usedevelop = True
install_command = pip install {opts} {packages}
deps = -r{toxinidir}/requirements.txt
-r{toxinidir}/test-requirements.txt
-r{toxinidir}/dev-requirements.txt
commands =
pytest aprsd/main.py {posargs}
# Use -bb to enable BytesWarnings as error to catch str/bytes misuse.
# Use -Werror to treat warnings as errors.
# {envpython} -bb -Werror -m pytest \
# --cov="{envsitepackagesdir}/aprsd" --cov-report=html --cov-report=term {posargs}
{envpython} -bb -Werror -m pytest {posargs}
[testenv:cover]
[testenv:py27]
setenv = VIRTUAL_ENV={envdir}
usedevelop = True
install_command = pip install {opts} {packages}
deps = -r{toxinidir}/requirements.txt
-r{toxinidir}/test-requirements-py2.txt
commands =
pytest --cov=aprsd
# Use -bb to enable BytesWarnings as error to catch str/bytes misuse.
# Use -Werror to treat warnings as errors.
# {envpython} -bb -Werror -m pytest \
# --cov="{envsitepackagesdir}/aprsd" --cov-report=html --cov-report=term {posargs}
{envpython} -bb -Werror -m pytest
[testenv:docs]
deps = -r{toxinidir}/test-requirements.txt
commands = sphinx-build -b html docs/source docs/html
[testenv:pep8-27]
deps = -r{toxinidir}/requirements.txt
-r{toxinidir}/test-requirements-py2.txt
commands =
flake8 {posargs} aprsd
[testenv:pep8]
commands =
flake8 {posargs} aprsd
@ -33,7 +58,57 @@ commands =
{toxinidir}/tools/fast8.sh
passenv = FAST8_NUM_COMMITS
[testenv:lint]
skip_install = true
deps =
-r{toxinidir}/dev-requirements.txt
commands =
flake8 aprsd
[flake8]
max-line-length = 99
show-source = True
ignore = E713,E501
ignore = E713,E501,W503
extend-ignore = E203,W503
extend-exclude = venv
exclude = .venv,.git,.tox,dist,doc,.ropeproject
# This is the configuration for the tox-gh-actions plugin for GitHub Actions
# https://github.com/ymyzk/tox-gh-actions
# This section is not needed if not using GitHub Actions for CI.
[gh-actions]
python =
2.7: py27, pep8-27
3.6: py36, pep8, fmt-check
3.7: py38, pep8, fmt-check
3.8: py38, pep8, fmt-check, type-check, docs
3.9: py39
[testenv:fmt]
# This will reformat your code to comply with pep8
# and standard formatting
skip_install = true
deps =
-r{toxinidir}/dev-requirements.txt
commands =
isort .
black .
[testenv:fmt-check]
# Runs a check only on code formatting.
# you can fix imports by running isort standalone
# you can fix code formatting by running black standalone
skip_install = true
deps =
-r{toxinidir}/dev-requirements.txt
commands =
isort --check-only .
black --check .
[testenv:type-check]
skip_install = true
deps =
-r{toxinidir}/requirements.txt
-r{toxinidir}/dev-requirements.txt
commands =
mypy aprsd