Fixed all pep8 errors and some py3 errors

This introduced the six lib which can translate common
py2 vs py3 incompatibilities.
https://six.readthedocs.io/
This commit is contained in:
Hemna 2020-12-04 08:56:26 -05:00
parent e171e16854
commit 0c40689743
5 changed files with 147 additions and 107 deletions

View File

@ -39,6 +39,7 @@ args = parser.parse_args()
CONFIG = None
LOG = logging.getLogger('ARPSSERVER')
class MyAPRSServer(TelnetHandler):
@command('echo')
@ -60,9 +61,10 @@ class MyAPRSServer(TelnetHandler):
def signal_handler(signal, frame):
LOG.info("Ctrl+C, exiting.")
#sys.exit(0) # thread ignores this
# sys.exit(0) # thread ignores this
os._exit(0)
# Setup the logging faciility
# to disable logging to stdout, but still log to file
# use the --quiet option on the cmdln
@ -83,7 +85,7 @@ def setup_logging(args):
log_formatter = logging.Formatter(fmt=log_format,
datefmt=date_format)
fh = RotatingFileHandler('aprs-server.log',
maxBytes=(10248576*5),
maxBytes=(10248576 * 5),
backupCount=4)
fh.setFormatter(log_formatter)
LOG.addHandler(fh)
@ -122,7 +124,7 @@ class ClientThread(threading.Thread):
msg = self.msg_q.get(True, 0.05)
if msg:
LOG.info("Sending message '%s'" % msg)
self.conn.send(msg+"\n")
self.conn.send(msg + "\n")
except Queue.Empty:
pass
@ -145,7 +147,6 @@ class InputThread(threading.Thread):
self.msg_q.put(text)
threads = []
@ -166,7 +167,7 @@ def main(args):
ip = CONFIG['aprs']['host']
port = CONFIG['aprs']['port']
LOG.info("Start server listening on %s:%s" % (args.ip, args.port))
tcpsock.bind((ip,port))
tcpsock.bind((ip, port))
in_t = None
while True:
@ -182,7 +183,6 @@ def main(args):
in_t.start()
in_t.join()
for t in threads:
t.join()

View File

@ -61,10 +61,10 @@ def fuzzy(hour, minute, degree=1):
if dmin == 0:
s1 = f1
pos = pos - 1
elif dmin <= base/2:
elif dmin <= base / 2:
s1 = f2
if minute < 30:
pos = pos-1
pos = pos - 1
else:
s1 = f0
if minute > 30:
@ -118,4 +118,6 @@ def main():
print(fuzzy(h, m, deg))
return
main()
if __name__ == "__main__":
main()

View File

@ -31,10 +31,11 @@ import socket
import pprint
import re
import signal
import six
import smtplib
import subprocess
import sys
#import telnetlib
# import telnetlib
import threading
import time
import urllib
@ -96,7 +97,7 @@ parser.add_argument("--quiet",
args = parser.parse_args()
#def setup_connection():
# def setup_connection():
# global tn
# host = CONFIG['aprs']['host']
# port = CONFIG['aprs']['port']
@ -113,66 +114,84 @@ def setup_connection():
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((CONFIG['aprs']['host'], 14580))
except Exception, e:
print "Unable to connect to APRS-IS server.\n"
print str(e)
except Exception as e:
print("Unable to connect to APRS-IS server.\n")
print(str(e))
os._exit(1)
sock_file = sock.makefile(mode='r', bufsize=0 )
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) # disable nagle algorithm
sock_file = sock.makefile(mode='r', bufsize=0)
# disable nagle algorithm
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 512) # buffer size
def signal_handler(signal, frame):
LOG.info("Ctrl+C, exiting.")
# sys.exit(0) # thread ignores this
os._exit(0)
# end signal_handler
# end signal_handler
def parse_email(msgid, data, server):
envelope = data[b'ENVELOPE']
f = re.search('([\.\w_-]+@[\.\w_-]+)', str(envelope.from_[0]) ) # email address match
# email address match
# use raw string to avoid invalid escape secquence errors r"string here"
f = re.search(r"([\.\w_-]+@[\.\w_-]+)", str(envelope.from_[0]))
if f is not None:
from_addr = f.group(1)
from_addr = f.group(1)
else:
from_addr = "noaddr"
from_addr = "noaddr"
m = server.fetch([msgid], ['RFC822'])
msg = email.message_from_string(m[msgid]['RFC822'])
if msg.is_multipart():
text = ""
html = None
body = "* unreadable msg received" # default in case body somehow isn't set below - happened once
for part in msg.get_payload():
if part.get_content_charset() is None:
# We cannot know the character set, so return decoded "something"
text = part.get_payload(decode=True)
continue
charset = part.get_content_charset()
if part.get_content_type() == 'text/plain':
text = unicode(part.get_payload(decode=True), str(charset), "ignore").encode('utf8', 'replace')
if part.get_content_type() == 'text/html':
html = unicode(part.get_payload(decode=True), str(charset), "ignore").encode('utf8', 'replace')
if text is not None:
body = text.strip() # strip removes white space fore and aft of string
else:
body = html.strip()
else:
if msg.get_content_charset() == None: # email.uscc.net sends no charset, blows up unicode function below
text = unicode(msg.get_payload(decode=True), 'US-ASCII', 'ignore').encode('utf8', 'replace')
else:
text = unicode(msg.get_payload(decode=True), msg.get_content_charset(), 'ignore').encode('utf8', 'replace')
body = text.strip()
body = re.sub('<[^<]+?>', '', body) # strip all html tags
body = body.replace("\n", " ").replace("\r", " ") # strip CR/LF, make it one line, .rstrip fails at this
return(body, from_addr)
## end parse_email
text = ""
html = None
# default in case body somehow isn't set below - happened once
body = "* unreadable msg received"
for part in msg.get_payload():
if part.get_content_charset() is None:
# We cannot know the character set,
# so return decoded "something"
text = part.get_payload(decode=True)
continue
charset = part.get_content_charset()
if part.get_content_type() == 'text/plain':
text = six.text_type(
part.get_payload(decode=True), str(charset),
"ignore").encode('utf8', 'replace')
if part.get_content_type() == 'text/html':
html = six.text_type(
part.get_payload(decode=True),
str(charset),
"ignore").encode('utf8', 'replace')
if text is not None:
# strip removes white space fore and aft of string
body = text.strip()
else:
body = html.strip()
else:
# email.uscc.net sends no charset, blows up unicode function below
if msg.get_content_charset() is None:
text = six.text_type(
msg.get_payload(decode=True),
'US-ASCII',
'ignore').encode('utf8', 'replace')
else:
text = six.text_type(
msg.get_payload(decode=True),
msg.get_content_charset(),
'ignore').encode('utf8', 'replace')
body = text.strip()
# strip all html tags
body = re.sub('<[^<]+?>', '', body)
# strip CR/LF, make it one line, .rstrip fails at this
body = body.replace("\n", " ").replace("\r", " ")
return(body, from_addr)
# end parse_email
def resend_email(count, fromcall):
@ -191,7 +210,7 @@ def resend_email(count, fromcall):
CONFIG['imap']['login']))
server = IMAPClient(CONFIG['imap']['host'], use_uid=True)
server.login(CONFIG['imap']['login'], CONFIG['imap']['password'])
select_info = server.select_folder('INBOX')
server.select_folder('INBOX')
messages = server.search(['SINCE', today])
LOG.debug("%d messages received today" % len(messages))
@ -236,13 +255,13 @@ def check_email_thread(check_email_delay):
while True:
# threading.Timer(55, check_email_thread).start()
time.sleep(check_email_delay)
shortcuts = CONFIG['shortcuts']
# swap key/value
shortcuts_inverted = dict([[v, k] for k, v in shortcuts.items()])
date = datetime.datetime.now()
month = date.strftime("%B")[:3] # Nov, Mar, Apr
day = date.day
@ -258,7 +277,7 @@ def check_email_thread(check_email_delay):
server.login(CONFIG['imap']['login'], CONFIG['imap']['password'])
except Exception:
LOG.exception("Failed to login with IMAP server")
#return
# return
continue
server.select_folder('INBOX')
@ -270,13 +289,13 @@ def check_email_thread(check_email_delay):
envelope = data[b'ENVELOPE']
LOG.debug('ID:%d "%s" (%s)' %
(msgid, envelope.subject.decode(), envelope.date))
f = re.search('([[A-a][0-9]_-]+@[[A-a][0-9]_-\.]+)',
f = re.search(r"'([[A-a][0-9]_-]+@[[A-a][0-9]_-\.]+)",
str(envelope.from_[0]))
if f is not None:
from_addr = f.group(1)
else:
from_addr = "noaddr"
if "APRS" not in server.get_flags(msgid)[msgid]:
# if msg not flagged as sent via aprs
server.fetch([msgid], ['RFC822'])
@ -304,13 +323,14 @@ def check_email_thread(check_email_delay):
def send_ack_thread(tocall, ack, retry_count):
tocall = tocall.ljust(9) # pad to nine chars
line = "%s>APRS::%s:ack%s\n" % (CONFIG['aprs']['login'], tocall, ack)
line = ("{}>APRS::{}:ack{}\n".format(
CONFIG['aprs']['login'], tocall, ack))
for i in range(retry_count, 0, -1):
LOG.info("Sending ack __________________ Tx(%s)" % i)
LOG.info("Raw : %s" % line)
LOG.info("To : %s" % tocall)
LOG.info("Ack number : %s" % ack)
#tn.write(line)
LOG.info("Sending ack __________________ Tx({})".format(i))
LOG.info("Raw : {}".format(line))
LOG.info("To : {}".format(tocall))
LOG.info("Ack number : {}".format(ack))
# tn.write(line)
sock.send(line)
# aprs duplicate detection is 30 secs?
# (21 only sends first, 28 skips middle)
@ -330,18 +350,26 @@ def send_ack(tocall, ack):
def send_message_thread(tocall, message, this_message_number, retry_count):
global ack_dict
line = (CONFIG['aprs']['login'] + ">APRS::" + tocall + ":" + message +
"{" + str(this_message_number) + "\n")
# line = (CONFIG['aprs']['login'] + ">APRS::" + tocall + ":" + message
# + "{" + str(this_message_number) + "\n")
line = ("{}>APRS::{}:{}{{{}\n".format(
CONFIG['aprs']['login'],
tocall, message,
str(this_message_number),
))
for i in range(retry_count, 0, -1):
LOG.debug("DEBUG: send_message_thread msg:ack combos are: ")
LOG.debug(pprint.pformat(ack_dict))
if ack_dict[this_message_number] != 1:
LOG.info("Sending message_______________ " +
str(this_message_number) + "(Tx" + str(i) + ")")
LOG.info("Raw : " + line)
LOG.info("To : " + tocall)
LOG.info("Message : " + message)
#tn.write(line)
LOG.info("Sending message_______________ {}(Tx{})"
.format(
str(this_message_number),
str(i)
))
LOG.info("Raw : {}".format(line))
LOG.info("To : {}".format(tocall))
LOG.info("Message : {}".format(message))
# tn.write(line)
sock.send(line)
# decaying repeats, 31 to 93 second intervals
sleeptime = (retry_count - i + 1) * 31
@ -465,7 +493,7 @@ def setup_logging(args):
log_formatter = logging.Formatter(fmt=log_format,
datefmt=date_format)
fh = RotatingFileHandler(CONFIG['aprs']['logfile'],
maxBytes=(10248576*5),
maxBytes=(10248576 * 5),
backupCount=4)
fh.setFormatter(log_formatter)
LOG.addHandler(fh)
@ -492,12 +520,12 @@ def main(args=args):
user = CONFIG['aprs']['login']
password = CONFIG['aprs']['password']
LOG.info("LOGIN to APRSD with user '%s'" % user)
#tn.write("user %s pass %s vers aprsd 0.99\n" % (user, password))
# tn.write("user %s pass %s vers aprsd 0.99\n" % (user, password))
sock.send("user %s pass %s vers aprsd 0.99\n" % (user, password))
time.sleep(2)
check_email_delay = 60 # initial email check interval
check_email_delay = 60 # initial email check interval
checkemailthread = threading.Thread(target=check_email_thread, args=(check_email_delay, )) # args must be tuple
checkemailthread.start()
@ -505,9 +533,9 @@ def main(args=args):
while True:
line = ""
try:
#for char in tn.read_until("\n", 100):
# for char in tn.read_until("\n", 100):
# line = line + char
#line = line.replace('\n', '')
# line = line.replace('\n', '')
line = sock_file.readline().strip()
LOG.info(line)
searchstring = '::%s' % user
@ -526,31 +554,40 @@ def main(args=args):
continue
# EMAIL (-)
elif re.search('^-.*', message): # is email command
# is email command
elif re.search('^-.*', message):
searchstring = '^' + CONFIG['ham']['callsign'] + '.*'
if re.search(searchstring, fromcall): # only I can do email
r = re.search('^-([0-9])[0-9]*$', message) # digits only, first one is number of emails to resend
# only I can do email
if re.search(searchstring, fromcall):
# digits only, first one is number of emails to resend
r = re.search('^-([0-9])[0-9]*$', message)
if r is not None:
resend_email(r.group(1), fromcall)
elif re.search('^-([A-Za-z0-9_\-\.@]+) (.*)', message): # -user@address.com body of email
a = re.search('^-([A-Za-z0-9_\-\.@]+) (.*)', message) # (same search again)
# -user@address.com body of email
elif re.search(r"^-([A-Za-z0-9_\-\.@]+) (.*)", message):
# (same search again)
a = re.search(r"^-([A-Za-z0-9_\-\.@]+) (.*)", message)
if a is not None:
to_addr = a.group(1)
content = a.group(2)
if content == 'mapme': # send recipient link to aprs.fi map
content = "Click for my location: http://aprs.fi/" + CONFIG['ham']['callsign']
# send recipient link to aprs.fi map
if content == 'mapme':
content = (
"Click for my location: http://aprs.fi/{}".
format(CONFIG['ham']['callsign']))
too_soon = 0
now = time.time()
if ack in email_sent_dict: # see if we sent this msg number recently
# see if we sent this msg number recently
if ack in email_sent_dict:
timedelta = now - email_sent_dict[ack]
if ( timedelta < 300 ): # five minutes
if (timedelta < 300): # five minutes
too_soon = 1
if not too_soon or ack == 0:
send_result = send_email(to_addr, content)
if send_result != 0:
send_message(fromcall, "-" + to_addr + " failed")
else:
#send_message(fromcall, "-" + to_addr + " sent")
# send_message(fromcall, "-" + to_addr + " sent")
if len(email_sent_dict) > 98: # clear email sent dictionary if somehow goes over 100
LOG.debug("DEBUG: email_sent_dict is big (" + str(len(email_sent_dict)) + ") clearing out.")
email_sent_dict.clear()
@ -567,7 +604,7 @@ def main(args=args):
m = stm.tm_min
cur_time = fuzzy(h, m, 1)
reply = cur_time + " (" + str(h) + ":" + str(m).rjust(2, '0') + "PDT)" + " (" + message.rstrip() + ")"
thread = threading.Thread(target = send_message, args = (fromcall, reply))
thread = threading.Thread(target=send_message, args=(fromcall, reply))
thread.start()
# FORTUNE (f)
@ -615,33 +652,33 @@ def main(args=args):
elif re.search('^[lL]', message):
# get last location of a callsign, get descriptive name from weather service
try:
a = re.search('^.*\s+(.*)', message) # optional second argument is a callsign to search
a = re.search(r"'^.*\s+(.*)", message) # optional second argument is a callsign to search
if a is not None:
searchcall = a.group(1)
searchcall = searchcall.upper()
searchcall = a.group(1)
searchcall = searchcall.upper()
else:
searchcall = fromcall # if no second argument, search for calling station
searchcall = fromcall # if no second argument, search for calling station
url = "http://api.aprs.fi/api/get?name=" + searchcall + "&what=loc&apikey=104070.f9lE8qg34L8MZF&format=json"
response = urllib.urlopen(url)
aprs_data = json.loads(response.read())
lat = aprs_data['entries'][0]['lat']
lon = aprs_data['entries'][0]['lng']
lat = aprs_data['entries'][0]['lat']
lon = aprs_data['entries'][0]['lng']
try: # altitude not always provided
alt = aprs_data['entries'][0]['altitude']
except:
alt = 0
altfeet = int(alt * 3.28084)
alt = aprs_data['entries'][0]['altitude']
except Exception:
alt = 0
altfeet = int(alt * 3.28084)
aprs_lasttime_seconds = aprs_data['entries'][0]['lasttime']
aprs_lasttime_seconds = aprs_lasttime_seconds.encode('ascii',errors='ignore') #unicode to ascii
aprs_lasttime_seconds = aprs_lasttime_seconds.encode('ascii', errors='ignore') # unicode to ascii
delta_seconds = time.time() - int(aprs_lasttime_seconds)
delta_hours = delta_seconds / 60 / 60
url2 = "https://forecast.weather.gov/MapClick.php?lat=" + str(lat) + "&lon=" + str(lon) + "&FcstType=json"
response2 = urllib.urlopen(url2)
wx_data = json.loads(response2.read())
reply = searchcall + ": " + wx_data['location']['areaDescription'] + " " + str(altfeet) + "' " + str(lat) + "," + str(lon) + " " + str("%.1f" % round(delta_hours,1)) + "h ago"
reply = reply.encode('ascii',errors='ignore') # unicode to ascii
reply = searchcall + ": " + wx_data['location']['areaDescription'] + " " + str(altfeet) + "' " + str(lat) + "," + str(lon) + " " + str("%.1f" % round(delta_hours, 1)) + "h ago"
reply = reply.encode('ascii', errors='ignore') # unicode to ascii
send_message(fromcall, reply.rstrip())
except:
except Exception:
reply = "Unable to find station " + searchcall + ". Sending beacons?"
send_message(fromcall, reply.rstrip())
@ -694,7 +731,7 @@ def main(args=args):
os._exit(1)
# end while True
#tn.close()
# tn.close()
sock.shutdown(0)
sock.close()

View File

@ -1,3 +1,4 @@
pbr
imapclient
pyyaml
six

View File

@ -23,7 +23,7 @@ commands = sphinx-build -b html docs/source docs/html
[testenv:pep8]
commands =
flake8 {posargs} aprsd test
flake8 {posargs} aprsd
[testenv:fast8]
basepython = python3
@ -35,5 +35,5 @@ passenv = FAST8_NUM_COMMITS
[flake8]
show-source = True
ignore = E713
ignore = E713,E501
exclude = .venv,.git,.tox,dist,doc,.ropeproject