1
0
mirror of https://github.com/craigerl/aprsd.git synced 2024-11-21 07:41:49 -05:00

Added tox support

This patch adds support for tox.  Tox is used to run various
python compliance tests.  This enables pep8 tests, as well as python2
and python3 compatibility as well as coverage and documentation
building.
This commit is contained in:
Walter A. Boring IV 2019-05-15 20:12:59 +00:00
parent 28e062bd44
commit d1a50c6559
6 changed files with 364 additions and 220 deletions

View File

@ -17,98 +17,105 @@
import sys
import time
def fuzzy(hour, minute, degree=1):
'''Implements the fuzzy clock.
returns the the string that spells out the time - hour:minute
Supports two degrees of fuzziness. Set with degree = 1 or degree = 2
When degree = 1, time is in quantum of 5 minutes.
When degree = 2, time is in quantum of 15 minutes.'''
'''Implements the fuzzy clock.
returns the the string that spells out the time - hour:minute
Supports two degrees of fuzziness. Set with degree = 1 or degree = 2
When degree = 1, time is in quantum of 5 minutes.
When degree = 2, time is in quantum of 15 minutes.'''
if degree<=0 or degree>2:
print 'Please use a degree of 1 or 2. Using fuzziness degree=1'
degree = 1
if degree <= 0 or degree > 2:
print('Please use a degree of 1 or 2. Using fuzziness degree=1')
degree = 1
begin = 'It\'s '
begin = 'It\'s '
f0 = 'almost '
f1 = 'exactly '
f2 = 'around '
f0 = 'almost '
f1 = 'exactly '
f2 = 'around '
b0 = ' past '
b1 = ' to '
b0 = ' past '
b1 = ' to '
hourList = ('One', 'Two', 'Three', 'Four', 'Five', 'Six', 'Seven', 'Eight', 'Nine', 'Ten', 'Eleven', 'Twelve')
hourList = ('One', 'Two', 'Three', 'Four', 'Five', 'Six', 'Seven', 'Eight',
'Nine', 'Ten', 'Eleven', 'Twelve')
s1 = s2 = s3 = s4 = ''
base = 5
s1 = s2 = s3 = s4 = ''
base = 5
if degree == 1:
base = 5
val = ('Five', 'Ten', 'Quarter', 'Twenty', 'Twenty-Five', 'Half')
elif degree == 2:
base = 15
val = ('Quarter', 'Half')
if degree == 1:
base = 5
val = ('Five', 'Ten', 'Quarter', 'Twenty', 'Twenty-Five', 'Half')
elif degree == 2:
base = 15
val = ('Quarter', 'Half')
dmin = minute % base # to find whether we have to use 'almost', 'exactly' or 'around'
if minute > 30:
pos = int((60 - minute) / base) #position in the tuple 'val'
else:
pos = int(minute / base)
# to find whether we have to use 'almost', 'exactly' or 'around'
dmin = minute % base
if minute > 30:
pos = int((60 - minute) / base) # position in the tuple 'val'
else:
pos = int(minute / base)
if dmin == 0:
s1 = f1
pos = pos - 1
elif dmin <= base/2:
s1 = f2
if minute < 30:
pos = pos-1
else:
s1 = f0
if minute > 30:
pos = pos -1
if dmin == 0:
s1 = f1
pos = pos - 1
elif dmin <= base/2:
s1 = f2
if minute < 30:
pos = pos-1
else:
s1 = f0
if minute > 30:
pos = pos - 1
s2 = val[pos]
s2 = val[pos]
if minute <= base/2: # Case like "It's around/exactly Ten"
s2 = s3 = ''
s4 = hourList[hour - 12 - 1]
elif minute >= 60-base/2: # Case like "It's almost Ten"
s2 = s3 = ''
s4 = hourList[hour - 12]
else: # Other cases with all words, like "It's around Quarter past One"
if minute>30:
s3 = b1 # to
s4 = hourList[hour - 12]
else:
s3 = b0 # past
s4 = hourList[hour - 12 - 1]
if minute <= base / 2:
# Case like "It's around/exactly Ten"
s2 = s3 = ''
s4 = hourList[hour - 12 - 1]
elif minute >= 60 - base / 2:
# Case like "It's almost Ten"
s2 = s3 = ''
s4 = hourList[hour - 12]
else:
# Other cases with all words, like "It's around Quarter past One"
if minute > 30:
s3 = b1 # to
s4 = hourList[hour - 12]
else:
s3 = b0 # past
s4 = hourList[hour - 12 - 1]
return begin + s1 + s2 + s3 + s4
return begin + s1 + s2 + s3 + s4
def main():
deg = 1
stm = time.localtime()
h = stm.tm_hour
m = stm.tm_min
deg = 1
stm = time.localtime()
h = stm.tm_hour
m = stm.tm_min
if len(sys.argv)>=2:
try:
deg = int(sys.argv[1])
except:
print 'Please use a degree of 1 or 2. Using fuzziness degree=1'
if len(sys.argv) >= 2:
try:
deg = int(sys.argv[1])
except Exception:
print('Please use a degree of 1 or 2. Using fuzziness degree=1')
if len(sys.argv)>=3:
tm = sys.argv[2].split(':')
try:
h = int(tm[0])
m = int(tm[1])
if h<0 or h>23 or m<0 or m>59:
raise Exception
except:
print 'Bad time entered. Using the system time.'
h = stm.tm_hour
m = stm.tm_min
print fuzzy(h, m, deg)
return
if len(sys.argv) >= 3:
tm = sys.argv[2].split(':')
try:
h = int(tm[0])
m = int(tm[1])
if h < 0 or h > 23 or m < 0 or m > 59:
raise Exception
except Exception:
print('Bad time entered. Using the system time.')
h = stm.tm_hour
m = stm.tm_min
print(fuzzy(h, m, deg))
return
main()

View File

@ -46,7 +46,7 @@ from imapclient import IMAPClient, SEEN
# local imports here
from aprsd.fuzzyclock import fuzzy
import utils
from aprsd import utils
# setup the global logger
LOG = logging.getLogger('APRSD')
@ -58,7 +58,7 @@ CONFIG = None
# HOST = "noam.aprs2.net" # north america tier2 servers round robin
# USER = "KM6XXX-9" # callsign of this aprs client with SSID
# PASS = "99999" # google how to generate this
# BASECALLSIGN = "KM6XXX" # callsign of radio in the field to which we send email
# BASECALLSIGN = "KM6XXX" # callsign of radio in the field to send email
# shortcuts = {
# "aa" : "5551239999@vtext.com",
# "cl" : "craiglamparter@somedomain.org",
@ -66,9 +66,18 @@ CONFIG = None
# }
# globals - tell me a better way to update data being used by threads
email_sent_dict = {} # message_number:time combos so we don't resend the same email in five mins {int:int}
ack_dict = {} # message_nubmer:ack combos so we stop sending a message after an ack from radio {int:int}
message_number = 0 # current aprs radio message number, increments for each message we send over rf {int}
# message_number:time combos so we don't resend the same email in
# five mins {int:int}
email_sent_dict = {}
# message_nubmer:ack combos so we stop sending a message after an
# ack from radio {int:int}
ack_dict = {}
# current aprs radio message number, increments for each message we
# send over rf {int}
message_number = 0
# global telnet connection object
tn = None
@ -93,69 +102,82 @@ def setup_connection():
LOG.debug("Setting up telnet connection to '%s:%s'" % (host, port))
try:
tn = telnetlib.Telnet(host, port)
except Exception, e:
except Exception:
LOG.exception("Telnet session failed.")
sys.exit(-1)
def signal_handler(signal, frame):
LOG.info("Ctrl+C, exiting.")
#sys.exit(0) # thread ignores this
# sys.exit(0) # thread ignores this
os._exit(0)
### end signal_handler
# end signal_handler
def parse_email(msgid, data, server):
envelope = data[b'ENVELOPE']
#print('ID:%d "%s" (%s)' % (msgid, envelope.subject.decode(), envelope.date ))
f = re.search('([\.\w_-]+@[\.\w_-]+)', str(envelope.from_[0]) ) # email address match
if f is not None:
from_addr = f.group(1)
else:
from_addr = "noaddr"
m = server.fetch([msgid], ['RFC822'])
msg = email.message_from_string(m[msgid]['RFC822'])
if msg.is_multipart():
text = ""
html = None
body = "* unreadable msg received" # default in case body somehow isn't set below - happened once
envelope = data[b'ENVELOPE']
# print('ID:%d "%s" (%s)' %
# (msgid, envelope.subject.decode(), envelope.date))
# email address match
f = re.search('([\.\w_-]+@[\.\w_-]+)', str(envelope.from_[0]))
if f is not None:
from_addr = f.group(1)
else:
from_addr = "noaddr"
m = server.fetch([msgid], ['RFC822'])
msg = email.message_from_string(m[msgid]['RFC822'])
if msg.is_multipart():
text = ""
html = None
# default in case body somehow isn't set below - happened once
body = "* unreadable msg received"
for part in msg.get_payload():
if part.get_content_charset() is None:
# We cannot know the character set, so return decoded "something"
text = part.get_payload(decode=True)
continue
if part.get_content_charset() is None:
# We cannot know the character set, so return decoded "something"
text = part.get_payload(decode=True)
continue
charset = part.get_content_charset()
charset = part.get_content_charset()
if part.get_content_type() == 'text/plain':
text = unicode(part.get_payload(decode=True), str(charset), "ignore").encode('utf8', 'replace')
if part.get_content_type() == 'text/plain':
text = unicode(part.get_payload(decode=True), str(charset),
"ignore").encode('utf8', 'replace')
if part.get_content_type() == 'text/html':
html = unicode(part.get_payload(decode=True), str(charset), "ignore").encode('utf8', 'replace')
if part.get_content_type() == 'text/html':
html = unicode(part.get_payload(decode=True), str(charset),
"ignore").encode('utf8', 'replace')
if text is not None:
body = text.strip() # strip removes white space fore and aft of string
else:
body = html.strip()
else:
text = unicode(msg.get_payload(decode=True), msg.get_content_charset(), 'ignore').encode('utf8', 'replace')
body = text.strip()
if text is not None:
# strip removes white space fore and aft of string
body = text.strip()
else:
body = html.strip()
else:
text = unicode(msg.get_payload(decode=True), msg.get_content_charset(),
'ignore').encode('utf8', 'replace')
body = text.strip()
body = re.sub('<[^<]+?>', '', body) # strip all html tags
body = body.replace("\n", " ").replace("\r", " ") # strip CR/LF, make it one line, .rstrip fails at this
return(body, from_addr)
## end parse_email
# strip all html tags
body = re.sub('<[^<]+?>', '', body)
# strip CR/LF, make it one line, .rstrip fails at this
body = body.replace("\n", " ").replace("\r", " ")
return(body, from_addr)
# end parse_email
def resend_email(count):
date = datetime.datetime.now()
date = datetime.datetime.now()
month = date.strftime("%B")[:3] # Nov, Mar, Apr
day = date.day
year = date.year
today = str(day) + "-" + month + "-" + str(year)
day = date.day
year = date.year
today = "%s-%s-%s" % (day, month, year)
shortcuts = CONFIG['shortcuts']
shortcuts_inverted = dict([[v,k] for k,v in shortcuts.items()]) # swap key/value
# swap key/value
shortcuts_inverted = dict([[v, k] for k, v in shortcuts.items()])
LOG.debug("resend_email: Connect to IMAP host '%s' with user '%s'" %
(CONFIG['imap']['host'],
@ -172,12 +194,16 @@ def resend_email(count):
messages.sort(reverse=True)
del messages[int(count):] # only the latest "count" messages
for message in messages:
for msgid, data in list(server.fetch(message, ['ENVELOPE']).items()): # one at a time, otherwise order is random
for msgid, data in list(server.fetch(message, ['ENVELOPE']).items()):
# one at a time, otherwise order is random
(body, from_addr) = parse_email(msgid, data, server)
server.remove_flags(msgid, [SEEN]) # unset seen flag, will stay bold in email client
if from_addr in shortcuts_inverted: # reverse lookup of a shortcut
# unset seen flag, will stay bold in email client
server.remove_flags(msgid, [SEEN])
if from_addr in shortcuts_inverted:
# reverse lookup of a shortcut
from_addr = shortcuts_inverted[from_addr]
reply = "-" + from_addr + " * " + body # asterisk indicates a resend
# asterisk indicates a resend
reply = "-" + from_addr + " * " + body
send_message(fromcall, reply)
msgexists = True
@ -186,29 +212,36 @@ def resend_email(count):
h = stm.tm_hour
m = stm.tm_min
s = stm.tm_sec
# append time as a kind of serial number to prevent FT1XDR from thinking this is a duplicate message.
# The FT1XDR pretty much ignores the aprs message number in this regard. The FTM400 gets it right.
reply = "No new msg " + str(h).zfill(2) + ":" + str(m).zfill(2) + ":" + str(s).zfill(2)
# append time as a kind of serial number to prevent FT1XDR from
# thinking this is a duplicate message.
# The FT1XDR pretty much ignores the aprs message number in this
# regard. The FTM400 gets it right.
reply = "No new msg %s:%s:%s" % (str(h).zfill(2),
str(m).zfill(2),
str(s).zfill(2))
send_message(fromcall, reply)
server.logout()
### end resend_email()
# end resend_email()
def check_email_thread():
# print "Email thread disabled."
# return
LOG.debug("Starting Email thread")
threading.Timer(55, check_email_thread).start() # how do we skip first run?
# how do we skip first run?
threading.Timer(55, check_email_thread).start()
shortcuts = CONFIG['shortcuts']
shortcuts_inverted = dict([[v,k] for k,v in shortcuts.items()]) # swap key/value
# swap key/value
shortcuts_inverted = dict([[v, k] for k, v in shortcuts.items()])
date = datetime.datetime.now()
month = date.strftime("%B")[:3] # Nov, Mar, Apr
day = date.day
year = date.year
today = str(day) + "-" + month + "-" + str(year)
day = date.day
year = date.year
today = "%s-%s-%s" % (day, month, year)
LOG.debug("Connect to IMAP host '%s' with user '%s'" %
(CONFIG['imap']['host'],
@ -232,72 +265,82 @@ def check_email_thread():
LOG.debug('ID:%d "%s" (%s)' %
(msgid, envelope.subject.decode(), envelope.date))
f = re.search('([[A-a][0-9]_-]+@[[A-a][0-9]_-\.]+)',
str(envelope.from_[0]) )
str(envelope.from_[0]))
if f is not None:
from_addr = f.group(1)
else:
from_addr = "noaddr"
if "APRS" not in server.get_flags(msgid)[msgid]: #if msg not flagged as sent via aprs
m = server.fetch([msgid], ['RFC822'])
if "APRS" not in server.get_flags(msgid)[msgid]:
# if msg not flagged as sent via aprs
server.fetch([msgid], ['RFC822'])
(body, from_addr) = parse_email(msgid, data, server)
server.remove_flags(msgid, [SEEN]) # unset seen flag, will stay bold in email client
# unset seen flag, will stay bold in email client
server.remove_flags(msgid, [SEEN])
if from_addr in shortcuts_inverted: # reverse lookup of a shortcut
if from_addr in shortcuts_inverted:
# reverse lookup of a shortcut
from_addr = shortcuts_inverted[from_addr]
reply = "-" + from_addr + " " + body
#print "Sending message via aprs: " + reply
send_message(CONFIG['ham']['callsign'], reply) #radio
server.add_flags(msgid, ['APRS']) #flag message as sent via aprs
server.remove_flags(msgid, [SEEN]) #unset seen flag, will stay bold in email client
# print "Sending message via aprs: " + reply
# radio
send_message(CONFIG['ham']['callsign'], reply)
# flag message as sent via aprs
server.add_flags(msgid, ['APRS'])
# unset seen flag, will stay bold in email client
server.remove_flags(msgid, [SEEN])
server.logout()
### end check_email()
# end check_email()
def send_ack_thread(tocall, ack, retry_count):
tocall = tocall.ljust(9) # pad to nine chars
line = CONFIG['aprs']['login'] + ">APRS::" + tocall + ":ack" + str(ack) + "\n"
for i in range(retry_count, 0, -1):
LOG.info("Sending ack __________________ Tx(" + str(i) + ")")
LOG.info("Raw : " + line)
LOG.info("To : " + tocall)
LOG.info("Ack number : " + str(ack))
tn.write(line)
time.sleep(31) # aprs duplicate detection is 30 secs? (21 only sends first, 28 skips middle)
return()
### end_send_ack_thread
tocall = tocall.ljust(9) # pad to nine chars
line = "%s>APRS::%s:ack%s\n" % (CONFIG['aprs']['login'], tocall, ack)
for i in range(retry_count, 0, -1):
LOG.info("Sending ack __________________ Tx(%s)" % i)
LOG.info("Raw : %s" % line)
LOG.info("To : %s" % tocall)
LOG.info("Ack number : %s" % ack)
tn.write(line)
# aprs duplicate detection is 30 secs?
# (21 only sends first, 28 skips middle)
time.sleep(31)
return()
# end_send_ack_thread
def send_ack(tocall, ack):
retry_count = 3
thread = threading.Thread(target = send_ack_thread, args = (tocall, ack, retry_count))
thread.start()
return()
### end send_ack()
retry_count = 3
thread = threading.Thread(target=send_ack_thread,
args=(tocall, ack, retry_count))
thread.start()
return()
# end send_ack()
def send_message_thread(tocall, message, this_message_number, retry_count):
global ack_dict
line = (CONFIG['aprs']['login'] + ">APRS::" + tocall + ":" + message +
"{" + str(this_message_number) + "\n")
for i in range(retry_count, 0, -1):
LOG.debug("DEBUG: send_message_thread msg:ack combos are: ")
LOG.debug(pprint.pformat(ack_dict))
if ack_dict[this_message_number] != 1:
LOG.info("Sending message_______________ " +
str(this_message_number) + "(Tx" + str(i) + ")")
LOG.info("Raw : " + line)
LOG.info("To : " + tocall)
LOG.info("Message : " + message)
tn.write(line)
sleeptime = (retry_count - i + 1) * 31 # decaying repeats, 31 to 93 second intervals
time.sleep(sleeptime)
else:
break
return
### end send_message_thread
global ack_dict
line = (CONFIG['aprs']['login'] + ">APRS::" + tocall + ":" + message +
"{" + str(this_message_number) + "\n")
for i in range(retry_count, 0, -1):
LOG.debug("DEBUG: send_message_thread msg:ack combos are: ")
LOG.debug(pprint.pformat(ack_dict))
if ack_dict[this_message_number] != 1:
LOG.info("Sending message_______________ " +
str(this_message_number) + "(Tx" + str(i) + ")")
LOG.info("Raw : " + line)
LOG.info("To : " + tocall)
LOG.info("Message : " + message)
tn.write(line)
# decaying repeats, 31 to 93 second intervals
sleeptime = (retry_count - i + 1) * 31
time.sleep(sleeptime)
else:
break
return
# end send_message_thread
def send_message(tocall, message):
@ -307,39 +350,50 @@ def send_message(tocall, message):
if message_number > 98: # global
message_number = 0
message_number += 1
if len(ack_dict) > 90: # empty ack dict if it's really big, could result in key error later
LOG.debug("DEBUG: Length of ack dictionary is big at " + str(len(ack_dict)) + " clearing.")
if len(ack_dict) > 90:
# empty ack dict if it's really big, could result in key error later
LOG.debug("DEBUG: Length of ack dictionary is big at %s clearing." %
len(ack_dict))
ack_dict.clear()
LOG.debug(pprint.pformat(ack_dict))
LOG.debug("DEBUG: Cleared ack dictionary, ack_dict length is now " + str(len(ack_dict)) + ".")
LOG.debug("DEBUG: Cleared ack dictionary, ack_dict length is now %s." %
len(ack_dict))
ack_dict[message_number] = 0 # clear ack for this message number
tocall = tocall.ljust(9) # pad to nine chars
message = message[:67] # max? ftm400 displays 64, raw msg shows 74
# and ftm400-send is max 64. setting this to
# 67 displays 64 on the ftm400. (+3 {01 suffix)
# feature req: break long ones into two msgs
tocall = tocall.ljust(9) # pad to nine chars
# max? ftm400 displays 64, raw msg shows 74
# and ftm400-send is max 64. setting this to
# 67 displays 64 on the ftm400. (+3 {01 suffix)
# feature req: break long ones into two msgs
message = message[:67]
thread = threading.Thread(
target = send_message_thread,
args = (tocall, message, message_number, retry_count))
target=send_message_thread,
args=(tocall, message, message_number, retry_count))
thread.start()
return()
### end send_message()
# end send_message()
def process_message(line):
f = re.search('^(.*)>', line)
fromcall = f.group(1)
searchstring = '::' + CONFIG['aprs']['login'] + '[ ]*:(.*)' # verify this, callsign is padded out with spaces to colon
searchstring = '::%s[ ]*:(.*)' % CONFIG['aprs']['login']
# verify this, callsign is padded out with spaces to colon
m = re.search(searchstring, line)
fullmessage = m.group(1)
ack_attached = re.search('(.*){([0-9A-Z]+)', fullmessage) # ack formats include: {1, {AB}, {12
if ack_attached: # "{##" suffix means radio wants an ack back
message = ack_attached.group(1) # message content
ack_num = ack_attached.group(2) # suffix number to use in ack
ack_attached = re.search('(.*){([0-9A-Z]+)', fullmessage)
# ack formats include: {1, {AB}, {12
if ack_attached:
# "{##" suffix means radio wants an ack back
# message content
message = ack_attached.group(1)
# suffix number to use in ack
ack_num = ack_attached.group(2)
else:
message = fullmessage
ack_num = "0" # ack not requested, but lets send one as 0
# ack not requested, but lets send one as 0
ack_num = "0"
LOG.info("Received message______________")
LOG.info("Raw : " + line)
@ -348,7 +402,7 @@ def process_message(line):
LOG.info("Msg number : " + str(ack_num))
return (fromcall, message, ack_num)
### end process_message()
# end process_message()
def send_email(to_addr, content):
@ -379,7 +433,7 @@ def send_email(to_addr, content):
return(-1)
s.quit()
return(0)
### end send_email
# end send_email
# Setup the logging faciility
@ -413,7 +467,7 @@ def setup_logging(args):
LOG.addHandler(sh)
### main() ###
# main() ###
def main(args=args):
global CONFIG
@ -429,7 +483,7 @@ def main(args=args):
user = CONFIG['aprs']['login']
password = CONFIG['aprs']['password']
LOG.info("LOGIN to APRSD with user '%s'" % user)
tn.write("user %s pass %s vers aprsd 0.99\n" % (user, password) )
tn.write("user %s pass %s vers aprsd 0.99\n" % (user, password))
time.sleep(2)
check_email_thread() # start email reader thread
@ -438,11 +492,11 @@ def main(args=args):
while True:
line = ""
try:
for char in tn.read_until("\n",100):
for char in tn.read_until("\n", 100):
line = line + char
line = line.replace('\n', '')
LOG.info(line)
searchstring = '::' + user
searchstring = '::%s' % user
# is aprs message to us, not beacon, status, etc
if re.search(searchstring, line):
(fromcall, message, ack) = process_message(line)
@ -454,7 +508,7 @@ def main(args=args):
if re.search('^ack[0-9]+', message):
# put message_number:1 in dict to record the ack
a = re.search('^ack([0-9]+)', message)
ack_dict.update({int(a.group(1)):1})
ack_dict.update({int(a.group(1)): 1})
continue
# EMAIL (-)
@ -539,7 +593,7 @@ def main(args=args):
response2 = urllib.urlopen(url2)
wx_data = json.loads(response2.read())
reply = wx_data['location']['areaDescription'] + " " + str(altfeet) + "' " + str(lat) + "," + str(lon) + " " + str("%.1f" % round(delta_hours,1)) + "h ago"
reply = reply.encode('ascii',errors='ignore') # unicode to ascii
reply = reply.encode('ascii', errors='ignore') # unicode to ascii
send_message(fromcall, reply.rstrip())
except:
reply = "Unable to find you (send beacon?)"
@ -547,36 +601,51 @@ def main(args=args):
# WEATHER (w) "42F(68F/48F) Haze. Tonight, Haze then Chance Rain."
elif re.search('^w', message):
# get my last location from aprsis then get weather from weather service
# get my last location from aprsis then get weather from
# weather service
try:
url = "http://api.aprs.fi/api/get?name=" + fromcall + "&what=loc&apikey=104070.f9lE8qg34L8MZF&format=json"
url = ("http://api.aprs.fi/api/get?"
"&what=loc&apikey=104070.f9lE8qg34L8MZF&format=json"
"&name=%s" % fromcall)
response = urllib.urlopen(url)
aprs_data = json.loads(response.read())
lat = aprs_data['entries'][0]['lat']
lon = aprs_data['entries'][0]['lng']
url2 = "https://forecast.weather.gov/MapClick.php?lat=" + str(lat) + "&lon=" + str(lon) + "&FcstType=json"
lat = aprs_data['entries'][0]['lat']
lon = aprs_data['entries'][0]['lng']
url2 = ("https://forecast.weather.gov/MapClick.php?lat=%s"
"&lon=%s&FcstType=json" % (lat, lon))
response2 = urllib.urlopen(url2)
wx_data = json.loads(response2.read())
reply = wx_data['currentobservation']['Temp'] + "F(" + wx_data['data']['temperature'][0] + "F/" + wx_data['data']['temperature'][1] + "F) " + wx_data['data']['weather'][0] + ". " + wx_data['time']['startPeriodName'][1] + ", " + wx_data['data']['weather'][1] + "."
reply = reply.encode('ascii',errors='ignore') # unicode to ascii
reply = "%sF(%sF/%sF) %s. %s, %s." % (
wx_data['currentobservation']['Temp'],
wx_data['data']['temperature'][0],
wx_data['data']['temperature'][1],
wx_data['data']['weather'][0],
wx_data['time']['startPeriodName'][1],
wx_data['data']['weather'][1])
# unicode to ascii
reply = reply.encode('ascii', errors='ignore')
send_message(fromcall, reply.rstrip())
except:
except Exception:
reply = "Unable to find you (send beacon?)"
send_message(fromcall, reply)
# USAGE
else:
reply = "usage: time, fortune, loc, weath, -emailaddr emailbody, -#(resend)"
reply = ("usage: time, fortune, loc, weath, -emailaddr "
"emailbody, -#(resend)")
send_message(fromcall, reply)
time.sleep(1) # let any threads do their thing, then ack
send_ack(fromcall, ack) # send an ack last
# let any threads do their thing, then ack
time.sleep(1)
# send an ack last
send_ack(fromcall, ack)
except Exception, e:
except Exception as e:
LOG.error("Error in mainline loop:")
LOG.error("%s" % str(e))
LOG.error("Exiting.")
#sys.exit(1) # merely a suggestion
# sys.exit(1) # merely a suggestion
os._exit(1)
# end while True

View File

@ -36,6 +36,7 @@ imap:
log = logging.getLogger('APRSD')
def env(*vars, **kwargs):
"""This returns the first environment variable set.
if none are non-empty, defaults to '' or keyword arg default
@ -55,10 +56,12 @@ def get_config():
config = yaml.load(stream)
return config
else:
log.critical("%s is missing, please create a config file" % config_file)
print("\nCopy to ~/.aprsd/config.yml and edit\n\nSample config:\n %s" % example_config)
log.critical("%s is missing, please create config file" % config_file)
print("\nCopy to ~/.aprsd/config.yml and edit\n\nSample config:\n %s"
% example_config)
sys.exit(-1)
# This method tries to parse the config yaml file
# and consume the settings.
# If the required params don't exist,

1
test-requirements.txt Normal file
View File

@ -0,0 +1 @@
flake8

25
tools/fast8.sh Executable file
View File

@ -0,0 +1,25 @@
#!/bin/bash
NUM_COMMITS=${FAST8_NUM_COMMITS:-1}
if [[ $NUM_COMMITS = "smart" ]]; then
# Run on all commits not submitted yet
# (sort of -- only checks vs. "master" since this is easy)
NUM_COMMITS=$(git cherry master | wc -l)
fi
echo "Checking last $NUM_COMMITS commits."
cd $(dirname "$0")/..
CHANGED=$(git diff --name-only HEAD~${NUM_COMMITS} | tr '\n' ' ')
# Skip files that don't exist
# (have been git rm'd)
CHECK=""
for FILE in $CHANGED; do
if [ -f "$FILE" ]; then
CHECK="$CHECK $FILE"
fi
done
diff -u --from-file /dev/null $CHECK | flake8 --diff

39
tox.ini Normal file
View File

@ -0,0 +1,39 @@
[tox]
minversion = 1.6
skipdist = True
envlist = py27,py36,py37,fast8,pep8,cover,docs
[testenv]
setenv = VIRTUAL_ENV={envdir}
usedevelop = True
install_command = pip install {opts} {packages}
deps = -r{toxinidir}/requirements.txt
-r{toxinidir}/test-requirements.txt
commands =
pytest {posargs}
[testenv:cover]
commands =
pytest --cov=aprsd
[testenv:docs]
deps = -r{toxinidir}/test-requirements.txt
commands = sphinx-build -b html docs/source docs/html
[testenv:pep8]
commands =
flake8 {posargs} aprsd test
[testenv:fast8]
basepython = python3
# Use same environment directory as pep8 env to save space and install time
envdir = {toxworkdir}/pep8
commands =
{toxinidir}/tools/fast8.sh
passenv = FAST8_NUM_COMMITS
[flake8]
show-source = True
ignore = E713
exclude = .venv,.git,.tox,dist,doc,.ropeproject