From 30455153ba67ad5e16521e2d584dfcf69fd54d09 Mon Sep 17 00:00:00 2001 From: Abigail Gold Date: Tue, 7 Jan 2020 04:28:51 -0500 Subject: [PATCH 1/6] refactor ae7q call, add ae7q c alias Process on #95 --- CHANGELOG.md | 1 + exts/ae7q.py | 93 +++++++++++++++++++++++++++------------------------- 2 files changed, 50 insertions(+), 44 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e8648c6..794d353 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/). ## [Unreleased] ### Added - Added Trustee field to qrz command for club callsigns. +- Added alias for `ae7q call` command (`ae7q c`). ### Changed - Changelog command to accept a version as argument. - The qrz command can now link to a QRZ page instead of embedding the data with the `--link` flag. diff --git a/exts/ae7q.py b/exts/ae7q.py index 6671c7c..de5cee6 100644 --- a/exts/ae7q.py +++ b/exts/ae7q.py @@ -10,9 +10,8 @@ Test callsigns: KN8U: active, restricted AB2EE: expired, restricted KE8FGB: assigned once, no restrictions -NA2AAA: unassigned, no records -KC4USA: reserved but has call history -WF4EMA: " +KV4AAA: unassigned, no records +KC4USA: reserved, no call history, *but* has application history """ import discord.ext.commands as commands @@ -33,7 +32,7 @@ class AE7QCog(commands.Cog): if ctx.invoked_subcommand is None: await ctx.send_help(ctx.command) - @_ae7q_lookup.command(name="call", category=cmn.cat.lookup) + @_ae7q_lookup.command(name="call", aliases=["c"], category=cmn.cat.lookup) async def _ae7q_call(self, ctx: commands.Context, callsign: str): '''Look up the history for a callsign on [ae7q.com](http://ae7q.com/).''' callsign = callsign.upper() @@ -51,59 +50,41 @@ class AE7QCog(commands.Cog): page = await resp.text() soup = BeautifulSoup(page, features="html.parser") - tables = soup.select("table.Database") + tables = [[row for row in table.find_all("tr")] for table in soup.select("table.Database")] - for table in tables: - rows = table.find_all("tr") - if len(rows) > 1 and len(rows[0]) > 1: - break - if desc == '': - for row in rows: - desc += " ".join(row.getText().split()) - desc += '\n' - desc = desc.replace(callsign, f'`{callsign}`') - rows = None + table = tables[0] - first_header = ''.join(rows[0].find_all("th")[0].strings) + # find the first table in the page, and use it to make a description + if len(table[0]) == 1: + for row in table: + desc += " ".join(row.getText().split()) + desc += '\n' + desc = desc.replace(callsign, f'`{callsign}`') + table = tables[1] - if rows is None or first_header != 'Entity Name': + table_headers = table[0].find_all("th") + first_header = ''.join(table_headers[0].strings) if len(table_headers) > 0 else None + + # catch if the wrong table was selected + if first_header != 'Entity Name': embed.title = f"AE7Q History for {callsign}" embed.colour = cmn.colours.bad - embed.url = f"{base_url}{callsign}" + embed.url = base_url + callsign embed.description = desc embed.description += f'\nNo records found for `{callsign}`' await ctx.send(embed=embed) return - table_contents = [] # store your table here - for tr in rows: - if rows.index(tr) == 0: - # first_header = ''.join(tr.find_all("th")[0].strings) - # if first_header == 'Entity Name': - # print('yooooo') - continue - row_cells = [] - for td in tr.find_all('td'): - if td.getText().strip() != '': - row_cells.append(td.getText().strip()) - else: - row_cells.append('-') - if 'colspan' in td.attrs and int(td.attrs['colspan']) > 1: - for i in range(int(td.attrs['colspan']) - 1): - row_cells.append(row_cells[-1]) - for i, cell in enumerate(row_cells): - if cell == '"': - row_cells[i] = table_contents[-1][i] - if len(row_cells) > 1: - table_contents += [row_cells] + table = await process_table(table) embed = cmn.embed_factory(ctx) embed.title = f"AE7Q Records for {callsign}" embed.colour = cmn.colours.good - embed.url = f"{base_url}{callsign}" + embed.url = base_url + callsign - for row in table_contents[0:3]: - header = f'**{row[0]}** ({row[1]})' + # add the first three rows of the table to the embed + for row in table[0:3]: + header = f'**{row[0]}** ({row[1]})' # **Name** (Applicant Type) body = (f'Class: *{row[2]}*\n' f'Region: *{row[3]}*\n' f'Status: *{row[4]}*\n' @@ -113,9 +94,10 @@ class AE7QCog(commands.Cog): f'Expires: *{row[8]}*') embed.add_field(name=header, value=body, inline=False) + if len(table) > 3: + desc += f'\nRecords 1 to 3 of {len(table)}. See ae7q.com for more...' + embed.description = desc - if len(table_contents) > 3: - embed.description += f'\nRecords 1 to 3 of {len(table_contents)}. See ae7q.com for more...' await ctx.send(embed=embed) @@ -139,5 +121,28 @@ class AE7QCog(commands.Cog): # pass +async def process_table(table: list): + """Processes tables (including headers) and returns the processed table""" + table_contents = [] + for tr in table[1:]: + row = [] + for td in tr.find_all('td'): + cell_val = td.getText().strip() + row.append(cell_val if cell_val else '-') + + # take care of columns that span multiple rows by copying the contents rightward + if 'colspan' in td.attrs and int(td.attrs['colspan']) > 1: + for i in range(int(td.attrs['colspan']) - 1): + row.append(row[-1]) + + # get rid of ditto marks by copying the contents from the previous row + for i, cell in enumerate(row): + if cell == "\"": + row[i] = table_contents[-1][i] + # add row to table + table_contents += [row] + return table_contents + + def setup(bot: commands.Bot): bot.add_cog(AE7QCog(bot)) From eaa47fc7244e67c174b922ff45626ac2b638ff53 Mon Sep 17 00:00:00 2001 From: Abigail Gold Date: Tue, 7 Jan 2020 15:49:48 -0500 Subject: [PATCH 2/6] add ae7q trustee command, add short aliases for ae7q subcommands, bugfixes in ae7q call Progress on #95 Fixes #153 --- exts/ae7q.py | 117 +++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 95 insertions(+), 22 deletions(-) diff --git a/exts/ae7q.py b/exts/ae7q.py index de5cee6..505cad8 100644 --- a/exts/ae7q.py +++ b/exts/ae7q.py @@ -34,7 +34,7 @@ class AE7QCog(commands.Cog): @_ae7q_lookup.command(name="call", aliases=["c"], category=cmn.cat.lookup) async def _ae7q_call(self, ctx: commands.Context, callsign: str): - '''Look up the history for a callsign on [ae7q.com](http://ae7q.com/).''' + '''Look up the history of a callsign on [ae7q.com](http://ae7q.com/).''' callsign = callsign.upper() desc = '' base_url = "http://ae7q.com/query/data/CallHistory.php?CALL=" @@ -66,7 +66,7 @@ class AE7QCog(commands.Cog): first_header = ''.join(table_headers[0].strings) if len(table_headers) > 0 else None # catch if the wrong table was selected - if first_header != 'Entity Name': + if first_header is None or first_header != 'Entity Name': embed.title = f"AE7Q History for {callsign}" embed.colour = cmn.colours.bad embed.url = base_url + callsign @@ -75,16 +75,16 @@ class AE7QCog(commands.Cog): await ctx.send(embed=embed) return - table = await process_table(table) + table = await process_table(table[1:]) embed = cmn.embed_factory(ctx) - embed.title = f"AE7Q Records for {callsign}" + embed.title = f"AE7Q History for {callsign}" embed.colour = cmn.colours.good embed.url = base_url + callsign # add the first three rows of the table to the embed for row in table[0:3]: - header = f'**{row[0]}** ({row[1]})' # **Name** (Applicant Type) + header = f'**{row[0]}** ({row[1]})' # **Name** (Applicant Type) body = (f'Class: *{row[2]}*\n' f'Region: *{row[3]}*\n' f'Status: *{row[4]}*\n' @@ -101,30 +101,103 @@ class AE7QCog(commands.Cog): await ctx.send(embed=embed) - # TODO: write commands for other AE7Q response types? - # @_ae7q_lookup.command(name="trustee") - # async def _ae7q_trustee(self, ctx: commands.Context, callsign: str): - # pass + @_ae7q_lookup.command(name="trustee", aliases=["t"], category=cmn.cat.lookup) + async def _ae7q_trustee(self, ctx: commands.Context, callsign: str): + '''Look up the licenses for which a licensee is trustee on [ae7q.com](http://ae7q.com/).''' + callsign = callsign.upper() + desc = '' + base_url = "http://ae7q.com/query/data/CallHistory.php?CALL=" + embed = cmn.embed_factory(ctx) - # @_ae7q_lookup.command(name="applications", aliases=['apps']) - # async def _ae7q_applications(self, ctx: commands.Context, callsign: str): - # pass + async with self.session.get(base_url + callsign) as resp: + if resp.status != 200: + embed.title = "Error in AE7Q trustee command" + embed.description = 'Could not load AE7Q' + embed.colour = cmn.colours.bad + await ctx.send(embed=embed) + return + page = await resp.text() - # @_ae7q_lookup.command(name="frn") - # async def _ae7q_frn(self, ctx: commands.Context, frn: str): - # base_url = "http://ae7q.com/query/data/FrnHistory.php?FRN=" - # pass + soup = BeautifulSoup(page, features="html.parser") + tables = [[row for row in table.find_all("tr")] for table in soup.select("table.Database")] - # @_ae7q_lookup.command(name="licensee", aliases=["lic"]) - # async def _ae7q_licensee(self, ctx: commands.Context, frn: str): - # base_url = "http://ae7q.com/query/data/LicenseeIdHistory.php?ID=" - # pass + try: + table = tables[2] if len(tables[0][0]) == 1 else tables[1] + except IndexError: + embed.title = f"AE7Q Trustee History for {callsign}" + embed.colour = cmn.colours.bad + embed.url = base_url + callsign + embed.description = desc + embed.description += f'\nNo records found for `{callsign}`' + await ctx.send(embed=embed) + return + + table_headers = table[0].find_all("th") + first_header = ''.join(table_headers[0].strings) if len(table_headers) > 0 else None + + # catch if the wrong table was selected + if first_header is None or not first_header.startswith("With"): + embed.title = f"AE7Q Trustee History for {callsign}" + embed.colour = cmn.colours.bad + embed.url = base_url + callsign + embed.description = desc + embed.description += f'\nNo records found for `{callsign}`' + await ctx.send(embed=embed) + return + + table = await process_table(table[2:]) + + embed = cmn.embed_factory(ctx) + embed.title = f"AE7Q Trustee History for {callsign}" + embed.colour = cmn.colours.good + embed.url = base_url + callsign + + # add the first three rows of the table to the embed + for row in table[0:3]: + header = f'**{row[0]}** ({row[3]})' # **Name** (Applicant Type) + body = (f'Name: *{row[2]}*\n' + f'Region: *{row[1]}*\n' + f'Status: *{row[4]}*\n' + f'Granted: *{row[5]}*\n' + f'Effective: *{row[6]}*\n' + f'Cancelled: *{row[7]}*\n' + f'Expires: *{row[8]}*') + embed.add_field(name=header, value=body, inline=False) + + if len(table) > 3: + desc += f'\nRecords 1 to 3 of {len(table)}. See ae7q.com for more...' + + embed.description = desc + + await ctx.send(embed=embed) + + @_ae7q_lookup.command(name="applications", aliases=["a"], category=cmn.cat.lookup) + async def _ae7q_applications(self, ctx: commands.Context, callsign: str): + '''Look up the applications for a callsign on [ae7q.com](http://ae7q.com/).''' + + @_ae7q_lookup.command(name="frn", aliases=["f"], category=cmn.cat.lookup) + async def _ae7q_frn(self, ctx: commands.Context, frn: str): + '''Look up the history of an FRN on [ae7q.com](http://ae7q.com/).''' + base_url = "http://ae7q.com/query/data/FrnHistory.php?FRN=" + """ + NOTES: + - 2 tables: callsign history and application history + - If not found: no tables + """ + pass + + @_ae7q_lookup.command(name="licensee", aliases=["l"], category=cmn.cat.lookup) + async def _ae7q_licensee(self, ctx: commands.Context, frn: str): + '''Look up the history of a licensee ID on [ae7q.com](http://ae7q.com/).''' + base_url = "http://ae7q.com/query/data/LicenseeIdHistory.php?ID=" + # notes: same as FRN but with different input + pass async def process_table(table: list): - """Processes tables (including headers) and returns the processed table""" + """Processes tables (*not* including headers) and returns the processed table""" table_contents = [] - for tr in table[1:]: + for tr in table: row = [] for td in tr.find_all('td'): cell_val = td.getText().strip() From eb5e038624053c0c7d0331e5b3bc78d4a8d1502a Mon Sep 17 00:00:00 2001 From: Abigail Gold Date: Wed, 8 Jan 2020 16:20:52 -0500 Subject: [PATCH 3/6] add ae7q frn command ae7q applications is WIP, and is commented out. There is also an applications table on the FRN page, which could be used in the future Progress on #95 --- exts/ae7q.py | 125 ++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 123 insertions(+), 2 deletions(-) diff --git a/exts/ae7q.py b/exts/ae7q.py index 505cad8..e255618 100644 --- a/exts/ae7q.py +++ b/exts/ae7q.py @@ -173,7 +173,75 @@ class AE7QCog(commands.Cog): @_ae7q_lookup.command(name="applications", aliases=["a"], category=cmn.cat.lookup) async def _ae7q_applications(self, ctx: commands.Context, callsign: str): - '''Look up the applications for a callsign on [ae7q.com](http://ae7q.com/).''' + '''Look up the application history for a callsign on [ae7q.com](http://ae7q.com/).''' + """ + callsign = callsign.upper() + desc = '' + base_url = "http://ae7q.com/query/data/CallHistory.php?CALL=" + embed = cmn.embed_factory(ctx) + + async with self.session.get(base_url + callsign) as resp: + if resp.status != 200: + embed.title = "Error in AE7Q applications command" + embed.description = 'Could not load AE7Q' + embed.colour = cmn.colours.bad + await ctx.send(embed=embed) + return + page = await resp.text() + + soup = BeautifulSoup(page, features="html.parser") + tables = [[row for row in table.find_all("tr")] for table in soup.select("table.Database")] + + table = tables[0] + + # find the first table in the page, and use it to make a description + if len(table[0]) == 1: + for row in table: + desc += " ".join(row.getText().split()) + desc += '\n' + desc = desc.replace(callsign, f'`{callsign}`') + + # select the last table to get applications + table = tables[-1] + + table_headers = table[0].find_all("th") + first_header = ''.join(table_headers[0].strings) if len(table_headers) > 0 else None + + # catch if the wrong table was selected + if first_header is None or not first_header.startswith("Receipt"): + embed.title = f"AE7Q Application History for {callsign}" + embed.colour = cmn.colours.bad + embed.url = base_url + callsign + embed.description = desc + embed.description += f'\nNo records found for `{callsign}`' + await ctx.send(embed=embed) + return + + table = await process_table(table[1:]) + + embed = cmn.embed_factory(ctx) + embed.title = f"AE7Q Application History for {callsign}" + embed.colour = cmn.colours.good + embed.url = base_url + callsign + + # add the first three rows of the table to the embed + for row in table[0:3]: + header = f'**{row[1]}** ({row[3]})' # **Name** (Callsign) + body = (f'Received: *{row[0]}*\n' + f'Region: *{row[2]}*\n' + f'Purpose: *{row[5]}*\n' + f'Last Action: *{row[7]}*\n' + f'Application Status: *{row[8]}*\n') + embed.add_field(name=header, value=body, inline=False) + + if len(table) > 3: + desc += f'\nRecords 1 to 3 of {len(table)}. See ae7q.com for more...' + + embed.description = desc + + await ctx.send(embed=embed) + """ + pass @_ae7q_lookup.command(name="frn", aliases=["f"], category=cmn.cat.lookup) async def _ae7q_frn(self, ctx: commands.Context, frn: str): @@ -184,7 +252,60 @@ class AE7QCog(commands.Cog): - 2 tables: callsign history and application history - If not found: no tables """ - pass + desc = '' + base_url = "http://ae7q.com/query/data/FrnHistory.php?FRN=" + embed = cmn.embed_factory(ctx) + + async with self.session.get(base_url + frn) as resp: + if resp.status != 200: + embed.title = "Error in AE7Q frn command" + embed.description = 'Could not load AE7Q' + embed.colour = cmn.colours.bad + await ctx.send(embed=embed) + return + page = await resp.text() + + soup = BeautifulSoup(page, features="html.parser") + tables = [[row for row in table.find_all("tr")] for table in soup.select("table.Database")] + + table = tables[0] + + + table_headers = table[0].find_all("th") + first_header = ''.join(table_headers[0].strings) if len(table_headers) > 0 else None + + # catch if the wrong table was selected + if first_header is None or not first_header.startswith('With Licensee'): + embed.title = f"AE7Q History for FRN {frn}" + embed.colour = cmn.colours.bad + embed.url = base_url + frn + embed.description = f'No records found for FRN `{frn}`' + await ctx.send(embed=embed) + return + + table = await process_table(table[2:]) + + embed = cmn.embed_factory(ctx) + embed.title = f"AE7Q History for FRN {frn}" + embed.colour = cmn.colours.good + embed.url = base_url + frn + + # add the first three rows of the table to the embed + for row in table[0:3]: + header = f'**{row[0]}** ({row[3]})' # **Callsign** (Applicant Type) + body = (f'Class: *{row[4]}*\n' + f'Region: *{row[1]}*\n' + f'Status: *{row[5]}*\n' + f'Granted: *{row[6]}*\n' + f'Effective: *{row[7]}*\n' + f'Cancelled: *{row[8]}*\n' + f'Expires: *{row[9]}*') + embed.add_field(name=header, value=body, inline=False) + + if len(table) > 3: + embed.description = f'Records 1 to 3 of {len(table)}. See ae7q.com for more...' + + await ctx.send(embed=embed) @_ae7q_lookup.command(name="licensee", aliases=["l"], category=cmn.cat.lookup) async def _ae7q_licensee(self, ctx: commands.Context, frn: str): From 04cbc920ce7802207164772b2ab43857a42b8b6e Mon Sep 17 00:00:00 2001 From: Abigail Gold Date: Wed, 8 Jan 2020 17:07:02 -0500 Subject: [PATCH 4/6] add ae7q licensee command Progress on #95 --- exts/ae7q.py | 79 +++++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 72 insertions(+), 7 deletions(-) diff --git a/exts/ae7q.py b/exts/ae7q.py index e4c2283..490e2b4 100644 --- a/exts/ae7q.py +++ b/exts/ae7q.py @@ -247,13 +247,11 @@ class AE7QCog(commands.Cog): @_ae7q_lookup.command(name="frn", aliases=["f"], category=cmn.cat.lookup) async def _ae7q_frn(self, ctx: commands.Context, frn: str): '''Look up the history of an FRN on [ae7q.com](http://ae7q.com/).''' - base_url = "http://ae7q.com/query/data/FrnHistory.php?FRN=" """ NOTES: - 2 tables: callsign history and application history - If not found: no tables """ - desc = '' base_url = "http://ae7q.com/query/data/FrnHistory.php?FRN=" embed = cmn.embed_factory(ctx) @@ -269,8 +267,15 @@ class AE7QCog(commands.Cog): soup = BeautifulSoup(page, features="html.parser") tables = [[row for row in table.find_all("tr")] for table in soup.select("table.Database")] - table = tables[0] + if not len(tables): + embed.title = f"AE7Q History for FRN {frn}" + embed.colour = cmn.colours.bad + embed.url = base_url + frn + embed.description = f'No records found for FRN `{frn}`' + await ctx.send(embed=embed) + return + table = tables[0] table_headers = table[0].find_all("th") first_header = ''.join(table_headers[0].strings) if len(table_headers) > 0 else None @@ -294,7 +299,8 @@ class AE7QCog(commands.Cog): # add the first three rows of the table to the embed for row in table[0:3]: header = f'**{row[0]}** ({row[3]})' # **Callsign** (Applicant Type) - body = (f'Class: *{row[4]}*\n' + body = (f'Name: *{row[2]}*\n' + f'Class: *{row[4]}*\n' f'Region: *{row[1]}*\n' f'Status: *{row[5]}*\n' f'Granted: *{row[6]}*\n' @@ -309,11 +315,70 @@ class AE7QCog(commands.Cog): await ctx.send(embed=embed) @_ae7q_lookup.command(name="licensee", aliases=["l"], category=cmn.cat.lookup) - async def _ae7q_licensee(self, ctx: commands.Context, frn: str): + async def _ae7q_licensee(self, ctx: commands.Context, licensee_id: str): '''Look up the history of a licensee ID on [ae7q.com](http://ae7q.com/).''' + licensee_id = licensee_id.upper() base_url = "http://ae7q.com/query/data/LicenseeIdHistory.php?ID=" - # notes: same as FRN but with different input - pass + embed = cmn.embed_factory(ctx) + + async with self.session.get(base_url + licensee_id) as resp: + if resp.status != 200: + embed.title = "Error in AE7Q licensee command" + embed.description = 'Could not load AE7Q' + embed.colour = cmn.colours.bad + await ctx.send(embed=embed) + return + page = await resp.text() + + soup = BeautifulSoup(page, features="html.parser") + tables = [[row for row in table.find_all("tr")] for table in soup.select("table.Database")] + + if not len(tables): + embed.title = f"AE7Q History for Licensee {licensee_id}" + embed.colour = cmn.colours.bad + embed.url = base_url + licensee_id + embed.description = f'No records found for Licensee `{licensee_id}`' + await ctx.send(embed=embed) + return + + table = tables[0] + + table_headers = table[0].find_all("th") + first_header = ''.join(table_headers[0].strings) if len(table_headers) > 0 else None + + # catch if the wrong table was selected + if first_header is None or not first_header.startswith('With FCC'): + embed.title = f"AE7Q History for Licensee {licensee_id}" + embed.colour = cmn.colours.bad + embed.url = base_url + licensee_id + embed.description = f'No records found for Licensee `{licensee_id}`' + await ctx.send(embed=embed) + return + + table = await process_table(table[2:]) + + embed = cmn.embed_factory(ctx) + embed.title = f"AE7Q History for Licensee {licensee_id}" + embed.colour = cmn.colours.good + embed.url = base_url + licensee_id + + # add the first three rows of the table to the embed + for row in table[0:3]: + header = f'**{row[0]}** ({row[3]})' # **Callsign** (Applicant Type) + body = (f'Name: *{row[2]}*\n' + f'Class: *{row[4]}*\n' + f'Region: *{row[1]}*\n' + f'Status: *{row[5]}*\n' + f'Granted: *{row[6]}*\n' + f'Effective: *{row[7]}*\n' + f'Cancelled: *{row[8]}*\n' + f'Expires: *{row[9]}*') + embed.add_field(name=header, value=body, inline=False) + + if len(table) > 3: + embed.description = f'Records 1 to 3 of {len(table)}. See ae7q.com for more...' + + await ctx.send(embed=embed) async def process_table(table: list): From 2c3535d99e968709a66c8db7613815a0a5133bda Mon Sep 17 00:00:00 2001 From: Abigail Gold Date: Wed, 8 Jan 2020 17:18:53 -0500 Subject: [PATCH 5/6] update changelog for additional ae7q commands Fixes #95 --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 794d353..8f1c864 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/). ### Added - Added Trustee field to qrz command for club callsigns. - Added alias for `ae7q call` command (`ae7q c`). +- Added ae7q lookup by FRN and Licensee ID, and for trustee records (`ae7q frn, licensee, trustee`). ### Changed - Changelog command to accept a version as argument. - The qrz command can now link to a QRZ page instead of embedding the data with the `--link` flag. From 58c69f5aebc2cc0c6a774b3a1986d6b4a5494ea2 Mon Sep 17 00:00:00 2001 From: Abigail Gold Date: Wed, 8 Jan 2020 17:47:34 -0500 Subject: [PATCH 6/6] add typing context to ae7q commands --- exts/ae7q.py | 547 ++++++++++++++++++++++++++------------------------- 1 file changed, 276 insertions(+), 271 deletions(-) diff --git a/exts/ae7q.py b/exts/ae7q.py index 490e2b4..bfcbcb6 100644 --- a/exts/ae7q.py +++ b/exts/ae7q.py @@ -36,211 +36,214 @@ class AE7QCog(commands.Cog): @_ae7q_lookup.command(name="call", aliases=["c"], category=cmn.cat.lookup) async def _ae7q_call(self, ctx: commands.Context, callsign: str): '''Look up the history of a callsign on [ae7q.com](http://ae7q.com/).''' - callsign = callsign.upper() - desc = '' - base_url = "http://ae7q.com/query/data/CallHistory.php?CALL=" - embed = cmn.embed_factory(ctx) + with ctx.typing(): + callsign = callsign.upper() + desc = '' + base_url = "http://ae7q.com/query/data/CallHistory.php?CALL=" + embed = cmn.embed_factory(ctx) - async with self.session.get(base_url + callsign) as resp: - if resp.status != 200: - embed.title = "Error in AE7Q call command" - embed.description = 'Could not load AE7Q' + async with self.session.get(base_url + callsign) as resp: + if resp.status != 200: + embed.title = "Error in AE7Q call command" + embed.description = 'Could not load AE7Q' + embed.colour = cmn.colours.bad + await ctx.send(embed=embed) + return + page = await resp.text() + + soup = BeautifulSoup(page, features="html.parser") + tables = [[row for row in table.find_all("tr")] for table in soup.select("table.Database")] + + table = tables[0] + + # find the first table in the page, and use it to make a description + if len(table[0]) == 1: + for row in table: + desc += " ".join(row.getText().split()) + desc += '\n' + desc = desc.replace(callsign, f'`{callsign}`') + table = tables[1] + + table_headers = table[0].find_all("th") + first_header = ''.join(table_headers[0].strings) if len(table_headers) > 0 else None + + # catch if the wrong table was selected + if first_header is None or first_header != 'Entity Name': + embed.title = f"AE7Q History for {callsign}" embed.colour = cmn.colours.bad + embed.url = base_url + callsign + embed.description = desc + embed.description += f'\nNo records found for `{callsign}`' await ctx.send(embed=embed) return - page = await resp.text() - soup = BeautifulSoup(page, features="html.parser") - tables = [[row for row in table.find_all("tr")] for table in soup.select("table.Database")] + table = await process_table(table[1:]) - table = tables[0] - - # find the first table in the page, and use it to make a description - if len(table[0]) == 1: - for row in table: - desc += " ".join(row.getText().split()) - desc += '\n' - desc = desc.replace(callsign, f'`{callsign}`') - table = tables[1] - - table_headers = table[0].find_all("th") - first_header = ''.join(table_headers[0].strings) if len(table_headers) > 0 else None - - # catch if the wrong table was selected - if first_header is None or first_header != 'Entity Name': + embed = cmn.embed_factory(ctx) embed.title = f"AE7Q History for {callsign}" - embed.colour = cmn.colours.bad + embed.colour = cmn.colours.good embed.url = base_url + callsign + + # add the first three rows of the table to the embed + for row in table[0:3]: + header = f'**{row[0]}** ({row[1]})' # **Name** (Applicant Type) + body = (f'Class: *{row[2]}*\n' + f'Region: *{row[3]}*\n' + f'Status: *{row[4]}*\n' + f'Granted: *{row[5]}*\n' + f'Effective: *{row[6]}*\n' + f'Cancelled: *{row[7]}*\n' + f'Expires: *{row[8]}*') + embed.add_field(name=header, value=body, inline=False) + + if len(table) > 3: + desc += f'\nRecords 1 to 3 of {len(table)}. See ae7q.com for more...' + embed.description = desc - embed.description += f'\nNo records found for `{callsign}`' + await ctx.send(embed=embed) - return - - table = await process_table(table[1:]) - - embed = cmn.embed_factory(ctx) - embed.title = f"AE7Q History for {callsign}" - embed.colour = cmn.colours.good - embed.url = base_url + callsign - - # add the first three rows of the table to the embed - for row in table[0:3]: - header = f'**{row[0]}** ({row[1]})' # **Name** (Applicant Type) - body = (f'Class: *{row[2]}*\n' - f'Region: *{row[3]}*\n' - f'Status: *{row[4]}*\n' - f'Granted: *{row[5]}*\n' - f'Effective: *{row[6]}*\n' - f'Cancelled: *{row[7]}*\n' - f'Expires: *{row[8]}*') - embed.add_field(name=header, value=body, inline=False) - - if len(table) > 3: - desc += f'\nRecords 1 to 3 of {len(table)}. See ae7q.com for more...' - - embed.description = desc - - await ctx.send(embed=embed) @_ae7q_lookup.command(name="trustee", aliases=["t"], category=cmn.cat.lookup) async def _ae7q_trustee(self, ctx: commands.Context, callsign: str): '''Look up the licenses for which a licensee is trustee on [ae7q.com](http://ae7q.com/).''' - callsign = callsign.upper() - desc = '' - base_url = "http://ae7q.com/query/data/CallHistory.php?CALL=" - embed = cmn.embed_factory(ctx) + with ctx.typing(): + callsign = callsign.upper() + desc = '' + base_url = "http://ae7q.com/query/data/CallHistory.php?CALL=" + embed = cmn.embed_factory(ctx) - async with self.session.get(base_url + callsign) as resp: - if resp.status != 200: - embed.title = "Error in AE7Q trustee command" - embed.description = 'Could not load AE7Q' + async with self.session.get(base_url + callsign) as resp: + if resp.status != 200: + embed.title = "Error in AE7Q trustee command" + embed.description = 'Could not load AE7Q' + embed.colour = cmn.colours.bad + await ctx.send(embed=embed) + return + page = await resp.text() + + soup = BeautifulSoup(page, features="html.parser") + tables = [[row for row in table.find_all("tr")] for table in soup.select("table.Database")] + + try: + table = tables[2] if len(tables[0][0]) == 1 else tables[1] + except IndexError: + embed.title = f"AE7Q Trustee History for {callsign}" embed.colour = cmn.colours.bad + embed.url = base_url + callsign + embed.description = desc + embed.description += f'\nNo records found for `{callsign}`' await ctx.send(embed=embed) return - page = await resp.text() - soup = BeautifulSoup(page, features="html.parser") - tables = [[row for row in table.find_all("tr")] for table in soup.select("table.Database")] + table_headers = table[0].find_all("th") + first_header = ''.join(table_headers[0].strings) if len(table_headers) > 0 else None - try: - table = tables[2] if len(tables[0][0]) == 1 else tables[1] - except IndexError: + # catch if the wrong table was selected + if first_header is None or not first_header.startswith("With"): + embed.title = f"AE7Q Trustee History for {callsign}" + embed.colour = cmn.colours.bad + embed.url = base_url + callsign + embed.description = desc + embed.description += f'\nNo records found for `{callsign}`' + await ctx.send(embed=embed) + return + + table = await process_table(table[2:]) + + embed = cmn.embed_factory(ctx) embed.title = f"AE7Q Trustee History for {callsign}" - embed.colour = cmn.colours.bad + embed.colour = cmn.colours.good embed.url = base_url + callsign + + # add the first three rows of the table to the embed + for row in table[0:3]: + header = f'**{row[0]}** ({row[3]})' # **Name** (Applicant Type) + body = (f'Name: *{row[2]}*\n' + f'Region: *{row[1]}*\n' + f'Status: *{row[4]}*\n' + f'Granted: *{row[5]}*\n' + f'Effective: *{row[6]}*\n' + f'Cancelled: *{row[7]}*\n' + f'Expires: *{row[8]}*') + embed.add_field(name=header, value=body, inline=False) + + if len(table) > 3: + desc += f'\nRecords 1 to 3 of {len(table)}. See ae7q.com for more...' + embed.description = desc - embed.description += f'\nNo records found for `{callsign}`' + await ctx.send(embed=embed) - return - - table_headers = table[0].find_all("th") - first_header = ''.join(table_headers[0].strings) if len(table_headers) > 0 else None - - # catch if the wrong table was selected - if first_header is None or not first_header.startswith("With"): - embed.title = f"AE7Q Trustee History for {callsign}" - embed.colour = cmn.colours.bad - embed.url = base_url + callsign - embed.description = desc - embed.description += f'\nNo records found for `{callsign}`' - await ctx.send(embed=embed) - return - - table = await process_table(table[2:]) - - embed = cmn.embed_factory(ctx) - embed.title = f"AE7Q Trustee History for {callsign}" - embed.colour = cmn.colours.good - embed.url = base_url + callsign - - # add the first three rows of the table to the embed - for row in table[0:3]: - header = f'**{row[0]}** ({row[3]})' # **Name** (Applicant Type) - body = (f'Name: *{row[2]}*\n' - f'Region: *{row[1]}*\n' - f'Status: *{row[4]}*\n' - f'Granted: *{row[5]}*\n' - f'Effective: *{row[6]}*\n' - f'Cancelled: *{row[7]}*\n' - f'Expires: *{row[8]}*') - embed.add_field(name=header, value=body, inline=False) - - if len(table) > 3: - desc += f'\nRecords 1 to 3 of {len(table)}. See ae7q.com for more...' - - embed.description = desc - - await ctx.send(embed=embed) @_ae7q_lookup.command(name="applications", aliases=["a"], category=cmn.cat.lookup) async def _ae7q_applications(self, ctx: commands.Context, callsign: str): '''Look up the application history for a callsign on [ae7q.com](http://ae7q.com/).''' """ - callsign = callsign.upper() - desc = '' - base_url = "http://ae7q.com/query/data/CallHistory.php?CALL=" - embed = cmn.embed_factory(ctx) + with ctx.typing(): + callsign = callsign.upper() + desc = '' + base_url = "http://ae7q.com/query/data/CallHistory.php?CALL=" + embed = cmn.embed_factory(ctx) - async with self.session.get(base_url + callsign) as resp: - if resp.status != 200: - embed.title = "Error in AE7Q applications command" - embed.description = 'Could not load AE7Q' + async with self.session.get(base_url + callsign) as resp: + if resp.status != 200: + embed.title = "Error in AE7Q applications command" + embed.description = 'Could not load AE7Q' + embed.colour = cmn.colours.bad + await ctx.send(embed=embed) + return + page = await resp.text() + + soup = BeautifulSoup(page, features="html.parser") + tables = [[row for row in table.find_all("tr")] for table in soup.select("table.Database")] + + table = tables[0] + + # find the first table in the page, and use it to make a description + if len(table[0]) == 1: + for row in table: + desc += " ".join(row.getText().split()) + desc += '\n' + desc = desc.replace(callsign, f'`{callsign}`') + + # select the last table to get applications + table = tables[-1] + + table_headers = table[0].find_all("th") + first_header = ''.join(table_headers[0].strings) if len(table_headers) > 0 else None + + # catch if the wrong table was selected + if first_header is None or not first_header.startswith("Receipt"): + embed.title = f"AE7Q Application History for {callsign}" embed.colour = cmn.colours.bad + embed.url = base_url + callsign + embed.description = desc + embed.description += f'\nNo records found for `{callsign}`' await ctx.send(embed=embed) return - page = await resp.text() - soup = BeautifulSoup(page, features="html.parser") - tables = [[row for row in table.find_all("tr")] for table in soup.select("table.Database")] + table = await process_table(table[1:]) - table = tables[0] - - # find the first table in the page, and use it to make a description - if len(table[0]) == 1: - for row in table: - desc += " ".join(row.getText().split()) - desc += '\n' - desc = desc.replace(callsign, f'`{callsign}`') - - # select the last table to get applications - table = tables[-1] - - table_headers = table[0].find_all("th") - first_header = ''.join(table_headers[0].strings) if len(table_headers) > 0 else None - - # catch if the wrong table was selected - if first_header is None or not first_header.startswith("Receipt"): + embed = cmn.embed_factory(ctx) embed.title = f"AE7Q Application History for {callsign}" - embed.colour = cmn.colours.bad + embed.colour = cmn.colours.good embed.url = base_url + callsign + + # add the first three rows of the table to the embed + for row in table[0:3]: + header = f'**{row[1]}** ({row[3]})' # **Name** (Callsign) + body = (f'Received: *{row[0]}*\n' + f'Region: *{row[2]}*\n' + f'Purpose: *{row[5]}*\n' + f'Last Action: *{row[7]}*\n' + f'Application Status: *{row[8]}*\n') + embed.add_field(name=header, value=body, inline=False) + + if len(table) > 3: + desc += f'\nRecords 1 to 3 of {len(table)}. See ae7q.com for more...' + embed.description = desc - embed.description += f'\nNo records found for `{callsign}`' + await ctx.send(embed=embed) - return - - table = await process_table(table[1:]) - - embed = cmn.embed_factory(ctx) - embed.title = f"AE7Q Application History for {callsign}" - embed.colour = cmn.colours.good - embed.url = base_url + callsign - - # add the first three rows of the table to the embed - for row in table[0:3]: - header = f'**{row[1]}** ({row[3]})' # **Name** (Callsign) - body = (f'Received: *{row[0]}*\n' - f'Region: *{row[2]}*\n' - f'Purpose: *{row[5]}*\n' - f'Last Action: *{row[7]}*\n' - f'Application Status: *{row[8]}*\n') - embed.add_field(name=header, value=body, inline=False) - - if len(table) > 3: - desc += f'\nRecords 1 to 3 of {len(table)}. See ae7q.com for more...' - - embed.description = desc - - await ctx.send(embed=embed) """ pass @@ -252,133 +255,135 @@ class AE7QCog(commands.Cog): - 2 tables: callsign history and application history - If not found: no tables """ - base_url = "http://ae7q.com/query/data/FrnHistory.php?FRN=" - embed = cmn.embed_factory(ctx) + with ctx.typing(): + base_url = "http://ae7q.com/query/data/FrnHistory.php?FRN=" + embed = cmn.embed_factory(ctx) - async with self.session.get(base_url + frn) as resp: - if resp.status != 200: - embed.title = "Error in AE7Q frn command" - embed.description = 'Could not load AE7Q' + async with self.session.get(base_url + frn) as resp: + if resp.status != 200: + embed.title = "Error in AE7Q frn command" + embed.description = 'Could not load AE7Q' + embed.colour = cmn.colours.bad + await ctx.send(embed=embed) + return + page = await resp.text() + + soup = BeautifulSoup(page, features="html.parser") + tables = [[row for row in table.find_all("tr")] for table in soup.select("table.Database")] + + if not len(tables): + embed.title = f"AE7Q History for FRN {frn}" embed.colour = cmn.colours.bad + embed.url = base_url + frn + embed.description = f'No records found for FRN `{frn}`' await ctx.send(embed=embed) return - page = await resp.text() - soup = BeautifulSoup(page, features="html.parser") - tables = [[row for row in table.find_all("tr")] for table in soup.select("table.Database")] + table = tables[0] - if not len(tables): + table_headers = table[0].find_all("th") + first_header = ''.join(table_headers[0].strings) if len(table_headers) > 0 else None + + # catch if the wrong table was selected + if first_header is None or not first_header.startswith('With Licensee'): + embed.title = f"AE7Q History for FRN {frn}" + embed.colour = cmn.colours.bad + embed.url = base_url + frn + embed.description = f'No records found for FRN `{frn}`' + await ctx.send(embed=embed) + return + + table = await process_table(table[2:]) + + embed = cmn.embed_factory(ctx) embed.title = f"AE7Q History for FRN {frn}" - embed.colour = cmn.colours.bad + embed.colour = cmn.colours.good embed.url = base_url + frn - embed.description = f'No records found for FRN `{frn}`' + + # add the first three rows of the table to the embed + for row in table[0:3]: + header = f'**{row[0]}** ({row[3]})' # **Callsign** (Applicant Type) + body = (f'Name: *{row[2]}*\n' + f'Class: *{row[4]}*\n' + f'Region: *{row[1]}*\n' + f'Status: *{row[5]}*\n' + f'Granted: *{row[6]}*\n' + f'Effective: *{row[7]}*\n' + f'Cancelled: *{row[8]}*\n' + f'Expires: *{row[9]}*') + embed.add_field(name=header, value=body, inline=False) + + if len(table) > 3: + embed.description = f'Records 1 to 3 of {len(table)}. See ae7q.com for more...' + await ctx.send(embed=embed) - return - - table = tables[0] - - table_headers = table[0].find_all("th") - first_header = ''.join(table_headers[0].strings) if len(table_headers) > 0 else None - - # catch if the wrong table was selected - if first_header is None or not first_header.startswith('With Licensee'): - embed.title = f"AE7Q History for FRN {frn}" - embed.colour = cmn.colours.bad - embed.url = base_url + frn - embed.description = f'No records found for FRN `{frn}`' - await ctx.send(embed=embed) - return - - table = await process_table(table[2:]) - - embed = cmn.embed_factory(ctx) - embed.title = f"AE7Q History for FRN {frn}" - embed.colour = cmn.colours.good - embed.url = base_url + frn - - # add the first three rows of the table to the embed - for row in table[0:3]: - header = f'**{row[0]}** ({row[3]})' # **Callsign** (Applicant Type) - body = (f'Name: *{row[2]}*\n' - f'Class: *{row[4]}*\n' - f'Region: *{row[1]}*\n' - f'Status: *{row[5]}*\n' - f'Granted: *{row[6]}*\n' - f'Effective: *{row[7]}*\n' - f'Cancelled: *{row[8]}*\n' - f'Expires: *{row[9]}*') - embed.add_field(name=header, value=body, inline=False) - - if len(table) > 3: - embed.description = f'Records 1 to 3 of {len(table)}. See ae7q.com for more...' - - await ctx.send(embed=embed) @_ae7q_lookup.command(name="licensee", aliases=["l"], category=cmn.cat.lookup) async def _ae7q_licensee(self, ctx: commands.Context, licensee_id: str): '''Look up the history of a licensee ID on [ae7q.com](http://ae7q.com/).''' - licensee_id = licensee_id.upper() - base_url = "http://ae7q.com/query/data/LicenseeIdHistory.php?ID=" - embed = cmn.embed_factory(ctx) + with ctx.typing(): + licensee_id = licensee_id.upper() + base_url = "http://ae7q.com/query/data/LicenseeIdHistory.php?ID=" + embed = cmn.embed_factory(ctx) - async with self.session.get(base_url + licensee_id) as resp: - if resp.status != 200: - embed.title = "Error in AE7Q licensee command" - embed.description = 'Could not load AE7Q' + async with self.session.get(base_url + licensee_id) as resp: + if resp.status != 200: + embed.title = "Error in AE7Q licensee command" + embed.description = 'Could not load AE7Q' + embed.colour = cmn.colours.bad + await ctx.send(embed=embed) + return + page = await resp.text() + + soup = BeautifulSoup(page, features="html.parser") + tables = [[row for row in table.find_all("tr")] for table in soup.select("table.Database")] + + if not len(tables): + embed.title = f"AE7Q History for Licensee {licensee_id}" embed.colour = cmn.colours.bad + embed.url = base_url + licensee_id + embed.description = f'No records found for Licensee `{licensee_id}`' await ctx.send(embed=embed) return - page = await resp.text() - soup = BeautifulSoup(page, features="html.parser") - tables = [[row for row in table.find_all("tr")] for table in soup.select("table.Database")] + table = tables[0] - if not len(tables): + table_headers = table[0].find_all("th") + first_header = ''.join(table_headers[0].strings) if len(table_headers) > 0 else None + + # catch if the wrong table was selected + if first_header is None or not first_header.startswith('With FCC'): + embed.title = f"AE7Q History for Licensee {licensee_id}" + embed.colour = cmn.colours.bad + embed.url = base_url + licensee_id + embed.description = f'No records found for Licensee `{licensee_id}`' + await ctx.send(embed=embed) + return + + table = await process_table(table[2:]) + + embed = cmn.embed_factory(ctx) embed.title = f"AE7Q History for Licensee {licensee_id}" - embed.colour = cmn.colours.bad + embed.colour = cmn.colours.good embed.url = base_url + licensee_id - embed.description = f'No records found for Licensee `{licensee_id}`' + + # add the first three rows of the table to the embed + for row in table[0:3]: + header = f'**{row[0]}** ({row[3]})' # **Callsign** (Applicant Type) + body = (f'Name: *{row[2]}*\n' + f'Class: *{row[4]}*\n' + f'Region: *{row[1]}*\n' + f'Status: *{row[5]}*\n' + f'Granted: *{row[6]}*\n' + f'Effective: *{row[7]}*\n' + f'Cancelled: *{row[8]}*\n' + f'Expires: *{row[9]}*') + embed.add_field(name=header, value=body, inline=False) + + if len(table) > 3: + embed.description = f'Records 1 to 3 of {len(table)}. See ae7q.com for more...' + await ctx.send(embed=embed) - return - - table = tables[0] - - table_headers = table[0].find_all("th") - first_header = ''.join(table_headers[0].strings) if len(table_headers) > 0 else None - - # catch if the wrong table was selected - if first_header is None or not first_header.startswith('With FCC'): - embed.title = f"AE7Q History for Licensee {licensee_id}" - embed.colour = cmn.colours.bad - embed.url = base_url + licensee_id - embed.description = f'No records found for Licensee `{licensee_id}`' - await ctx.send(embed=embed) - return - - table = await process_table(table[2:]) - - embed = cmn.embed_factory(ctx) - embed.title = f"AE7Q History for Licensee {licensee_id}" - embed.colour = cmn.colours.good - embed.url = base_url + licensee_id - - # add the first three rows of the table to the embed - for row in table[0:3]: - header = f'**{row[0]}** ({row[3]})' # **Callsign** (Applicant Type) - body = (f'Name: *{row[2]}*\n' - f'Class: *{row[4]}*\n' - f'Region: *{row[1]}*\n' - f'Status: *{row[5]}*\n' - f'Granted: *{row[6]}*\n' - f'Effective: *{row[7]}*\n' - f'Cancelled: *{row[8]}*\n' - f'Expires: *{row[9]}*') - embed.add_field(name=header, value=body, inline=False) - - if len(table) > 3: - embed.description = f'Records 1 to 3 of {len(table)}. See ae7q.com for more...' - - await ctx.send(embed=embed) async def process_table(table: list):