From fed97a03b31f7903a4066f6b047a48bf12ff8b5e Mon Sep 17 00:00:00 2001 From: Abigail Date: Wed, 22 Apr 2020 17:58:27 -0400 Subject: [PATCH] update ae7q extension to use ae7qparser Fixes #195 Fixes #186 Fixes #168 --- exts/ae7q.py | 651 ++++++++++++++++++++++++++--------------------- requirements.txt | 1 + 2 files changed, 357 insertions(+), 295 deletions(-) diff --git a/exts/ae7q.py b/exts/ae7q.py index b11aeeb..66c8082 100644 --- a/exts/ae7q.py +++ b/exts/ae7q.py @@ -16,8 +16,10 @@ the GNU General Public License, version 2. # KC4USA: reserved, no call history, *but* has application history +import re + import aiohttp -from bs4 import BeautifulSoup +import ae7qparser import discord.ext.commands as commands @@ -40,64 +42,86 @@ class AE7QCog(commands.Cog): """Looks up the history of a callsign on [ae7q.com](http://ae7q.com/).""" with ctx.typing(): callsign = callsign.upper() - desc = "" - base_url = "http://ae7q.com/query/data/CallHistory.php?CALL=" - embed = cmn.embed_factory(ctx) - async with self.session.get(base_url + callsign) as resp: - if resp.status != 200: - raise cmn.BotHTTPError(resp) - page = await resp.text() - - soup = BeautifulSoup(page, features="html.parser") - tables = [[row for row in table.find_all("tr")] for table in soup.select("table.Database")] - - table = tables[0] - - # find the first table in the page, and use it to make a description - if len(table[0]) == 1: - for row in table: - desc += " ".join(row.getText().split()) - desc += "\n" - desc = desc.replace(callsign, f"`{callsign}`") - table = tables[1] - - table_headers = table[0].find_all("th") - first_header = "".join(table_headers[0].strings) if len(table_headers) > 0 else None - - # catch if the wrong table was selected - if first_header is None or first_header != "Entity Name": - embed.title = f"AE7Q History for {callsign}" - embed.colour = cmn.colours.bad - embed.url = base_url + callsign - embed.description = desc - embed.description += f"\nNo records found for `{callsign}`" - await ctx.send(embed=embed) - return - - table = await process_table(table[1:]) + call_data = ae7qparser.get_call(callsign) embed = cmn.embed_factory(ctx) - embed.title = f"AE7Q History for {callsign}" + embed.title = f"AE7Q Callsign History for {callsign}" + embed.url = call_data.query_url embed.colour = cmn.colours.good - embed.url = base_url + callsign + embed.description = "" - # add the first three rows of the table to the embed - for row in table[0:3]: - header = f"**{row[0]}** ({row[1]})" # **Name** (Applicant Type) - body = (f"Class: *{row[2]}*\n" - f"Region: *{row[3]}*\n" - f"Status: *{row[4]}*\n" - f"Granted: *{row[5]}*\n" - f"Effective: *{row[6]}*\n" - f"Cancelled: *{row[7]}*\n" - f"Expires: *{row[8]}*") - embed.add_field(name=header, value=body, inline=False) + if isinstance(call_data, ae7qparser.Ae7qCallData): + if call_data.conditions: + embed.description = " ".join([ + " ".join([y.strip() for y in x]) for x in call_data.conditions + ]).replace(callsign, f"`{callsign}`") - if len(table) > 3: - desc += f"\nRecords 1 to 3 of {len(table)}. See ae7q.com for more..." + if not call_data.call_history: + if not call_data.event_callsign_history: + embed.colour = cmn.colours.bad + embed.description += f"\nNo records found for `{callsign}`" + else: + # add the first five rows of the event callsign history to the embed + for row in call_data.event_callsign_history[0:5]: + header = f"**{row.start_date:%Y-%m-%d}-{row.end_date:%Y-%m-%d}**" + body = (f"Requestor: *{row.entity_name} ({row.callsign})*\n" + f"Event: *{row.event_name}*") + embed.add_field(name=header, value=body, inline=False) - embed.description = desc + if len(call_data.event_callsign_history) > 5: + embed.description += (f"\nRecords 1 to 5 of {len(call_data.event_callsign_history)}. " + f"See [ae7q.com]({call_data.query_url}) for more...") + + else: + # add the first three rows of the callsign history to the embed + for row in call_data.call_history[0:3]: + header = f"**{row.entity_name}** ({row.applicant_type})" + body = (f"Class: *{row.operator_class}*\n" + f"Region: *{row.region_state}*\n" + f"Status: *{row.license_status}*\n") + if row.grant_date: + body += f"Granted: *{row.grant_date:%Y-%m-%d}*\n" + if row.effective_date: + body += f"Effective: *{row.effective_date:%Y-%m-%d}*\n" + if row.cancel_date: + body += f"Cancelled: *{row.cancel_date:%Y-%m-%d}*\n" + if row.expire_date: + body += f"Expires: *{row.expire_date:%Y-%m-%d}*\n" + + embed.add_field(name=header, value=body, inline=False) + + if len(call_data.call_history) > 3: + embed.description += (f"\nRecords 1 to 3 of {len(call_data.call_history)}. " + f"See [ae7q.com]({call_data.query_url}) for more...") + + elif isinstance(call_data, ae7qparser.Ae7qCanadianCallData): + if not call_data.callsign_data: + embed.colour = cmn.colours.bad + embed.description += f"\nNo records found for `{callsign}`" + else: + if call_data.given_names != "" and call_data.surname != "": + embed.add_field(name="Name", value=f"{call_data.given_names} {call_data.surname}", inline=True) + if call_data.address != "": + embed.add_field(name="Address", value=call_data.address, inline=True) + if call_data.locality != "": + embed.add_field(name="Locality", value=call_data.locality, inline=True) + if call_data.province != "": + embed.add_field(name="Province", value=call_data.province, inline=True) + if call_data.postal_code != "": + embed.add_field(name="Postal Code", value=call_data.postal_code, inline=True) + if call_data.country != "": + embed.add_field(name="Country", value=call_data.country, inline=True) + if call_data.region != "": + embed.add_field(name="Region", value=call_data.region, inline=True) + if call_data.grid_square != "": + embed.add_field(name="Grid Square", value=call_data.grid_square, inline=True) + if call_data.qualifications != "": + embed.add_field(name="License Qualifications", value=call_data.qualifications, inline=True) + + else: + embed.colour = cmn.colours.bad + embed.description += f"\nNo records found for `{callsign}`" await ctx.send(embed=embed) @@ -106,202 +130,283 @@ class AE7QCog(commands.Cog): """Looks up the licenses for which a licensee is trustee on [ae7q.com](http://ae7q.com/).""" with ctx.typing(): callsign = callsign.upper() - desc = "" - base_url = "http://ae7q.com/query/data/CallHistory.php?CALL=" - embed = cmn.embed_factory(ctx) - async with self.session.get(base_url + callsign) as resp: - if resp.status != 200: - raise cmn.BotHTTPError(resp) - page = await resp.text() - - soup = BeautifulSoup(page, features="html.parser") - tables = [[row for row in table.find_all("tr")] for table in soup.select("table.Database")] - - try: - table = tables[2] if len(tables[0][0]) == 1 else tables[1] - except IndexError: - embed.title = f"AE7Q Trustee History for {callsign}" - embed.colour = cmn.colours.bad - embed.url = base_url + callsign - embed.description = desc - embed.description += f"\nNo records found for `{callsign}`" - await ctx.send(embed=embed) - return - - table_headers = table[0].find_all("th") - first_header = "".join(table_headers[0].strings) if len(table_headers) > 0 else None - - # catch if the wrong table was selected - if first_header is None or not first_header.startswith("With"): - embed.title = f"AE7Q Trustee History for {callsign}" - embed.colour = cmn.colours.bad - embed.url = base_url + callsign - embed.description = desc - embed.description += f"\nNo records found for `{callsign}`" - await ctx.send(embed=embed) - return - - table = await process_table(table[2:]) + call_data = ae7qparser.get_call(callsign) embed = cmn.embed_factory(ctx) embed.title = f"AE7Q Trustee History for {callsign}" + embed.url = call_data.query_url embed.colour = cmn.colours.good - embed.url = base_url + callsign + embed.description = "" - # add the first three rows of the table to the embed - for row in table[0:3]: - header = f"**{row[0]}** ({row[3]})" # **Name** (Applicant Type) - body = (f"Name: *{row[2]}*\n" - f"Region: *{row[1]}*\n" - f"Status: *{row[4]}*\n" - f"Granted: *{row[5]}*\n" - f"Effective: *{row[6]}*\n" - f"Cancelled: *{row[7]}*\n" - f"Expires: *{row[8]}*") - embed.add_field(name=header, value=body, inline=False) + if isinstance(call_data, ae7qparser.Ae7qCallData): + if not call_data.trustee_history: + embed.colour = cmn.colours.bad + embed.description += f"\nNo records found for `{callsign}`" + else: + # add the first three rows of the trustee history to the embed + for row in call_data.trustee_history[0:3]: + header = f"**{row.callsign}** ({row.applicant_type})" + body = (f"Name: *{row.entity_name}*\n" + f"Region: *{row.region_state}*\n" + f"Status: *{row.license_status}*\n") + if row.grant_date: + body += f"Granted: *{row.grant_date:%Y-%m-%d}*\n" + if row.effective_date: + body += f"Effective: *{row.effective_date:%Y-%m-%d}*\n" + if row.cancel_date: + body += f"Cancelled: *{row.cancel_date:%Y-%m-%d}*\n" + if row.expire_date: + body += f"Expires: *{row.expire_date:%Y-%m-%d}*\n" - if len(table) > 3: - desc += f"\nRecords 1 to 3 of {len(table)}. See ae7q.com for more..." + embed.add_field(name=header, value=body, inline=False) - embed.description = desc + if len(call_data.trustee_history) > 3: + embed.description += (f"\nRecords 1 to 3 of {len(call_data.trustee_history)}. " + f"See [ae7q.com]({call_data.query_url}) for more...") - await ctx.send(embed=embed) - - @_ae7q_lookup.command(name="applications", aliases=["a"], category=cmn.cat.lookup) - async def _ae7q_applications(self, ctx: commands.Context, callsign: str): - """Looks up the application history for a callsign on [ae7q.com](http://ae7q.com/).""" - """ - with ctx.typing(): - callsign = callsign.upper() - desc = "" - base_url = "http://ae7q.com/query/data/CallHistory.php?CALL=" - embed = cmn.embed_factory(ctx) - - async with self.session.get(base_url + callsign) as resp: - if resp.status != 200: - raise cmn.BotHTTPError(resp) - page = await resp.text() - - soup = BeautifulSoup(page, features="html.parser") - tables = [[row for row in table.find_all("tr")] for table in soup.select("table.Database")] - - table = tables[0] - - # find the first table in the page, and use it to make a description - if len(table[0]) == 1: - for row in table: - desc += " ".join(row.getText().split()) - desc += "\n" - desc = desc.replace(callsign, f"`{callsign}`") - - # select the last table to get applications - table = tables[-1] - - table_headers = table[0].find_all("th") - first_header = "".join(table_headers[0].strings) if len(table_headers) > 0 else None - - # catch if the wrong table was selected - if first_header is None or not first_header.startswith("Receipt"): - embed.title = f"AE7Q Application History for {callsign}" + else: embed.colour = cmn.colours.bad - embed.url = base_url + callsign - embed.description = desc embed.description += f"\nNo records found for `{callsign}`" - await ctx.send(embed=embed) - return - - table = await process_table(table[1:]) - - embed = cmn.embed_factory(ctx) - embed.title = f"AE7Q Application History for {callsign}" - embed.colour = cmn.colours.good - embed.url = base_url + callsign - - # add the first three rows of the table to the embed - for row in table[0:3]: - header = f"**{row[1]}** ({row[3]})" # **Name** (Callsign) - body = (f"Received: *{row[0]}*\n" - f"Region: *{row[2]}*\n" - f"Purpose: *{row[5]}*\n" - f"Last Action: *{row[7]}*\n" - f"Application Status: *{row[8]}*\n") - embed.add_field(name=header, value=body, inline=False) - - if len(table) > 3: - desc += f"\nRecords 1 to 3 of {len(table)}. See ae7q.com for more..." - - embed.description = desc await ctx.send(embed=embed) - """ - raise NotImplementedError("Application history lookup not yet supported. " - "Check back in a later version of the bot.") + + @_ae7q_lookup.command(name="applications", aliases=["a", "apps"], category=cmn.cat.lookup) + async def _ae7q_applications(self, ctx: commands.Context, query: str): + """Looks up the application history for a callsign, FRN, or Licensee ID on [ae7q.com](http://ae7q.com/).""" + with ctx.typing(): + query = query.upper() + + # LID + if re.match(r"L\d+", query): + data = ae7qparser.get_licensee_id(query) + # FRN + elif re.match(r"\d{10}", query): + data = ae7qparser.get_frn(query) + # callsign + else: + data = ae7qparser.get_call(query) + + embed = cmn.embed_factory(ctx) + embed.title = f"AE7Q Application History for {query}" + embed.url = data.query_url + embed.colour = cmn.colours.good + embed.description = "" + + if not data.application_history: + embed.colour = cmn.colours.bad + embed.description += f"\nNo records found for `{query}`" + else: + # add the first three rows of the app history to the embed + if isinstance(data.application_history, ae7qparser.ApplicationsHistoryTable): + for row in data.application_history[0:3]: + header = f"**{row.uls_file_number[0]}** ({row.receipt_date:%Y-%m-%d})" + body = (f"Name: *{row.entity_name}*\n" + f"Callsign: *{row.application_callsign}*\n" + f"Region: *{row.region_state}*\n" + f"Purpose: *{row.application_purpose}*\n") + if row.payment_date: + body += f"Payment: *{row.payment_date:%Y-%m-%d}*\n" + if row.last_action_date: + body += f"Last Action: *{row.last_action_date:%Y-%m-%d}*\n" + body += f"Status: *{row.application_status}*" + embed.add_field(name=header, value=body, inline=False) + + elif isinstance(data.application_history, ae7qparser.VanityApplicationsHistoryTable): + for row in data.application_history[0:3]: + header = f"**{row.uls_file_number[0]}** ({row.receipt_date:%Y-%m-%d})" + body = (f"Callsign: *{row.application_callsign}*\n" + f"Region: *{row.region_state}*\n" + f"Operator Class: *{row.operator_class}*\n" + f"Purpose: *{row.application_purpose}*\n") + if row.payment_date: + body += f"Payment: *{row.payment_date:%Y-%m-%d}*\n" + if row.last_action_date: + body += f"Last Action: *{row.last_action_date:%Y-%m-%d}*\n" + body += f"Status: *{row.application_status}*\n" + if row.applied_callsigns: + body += (f"Callsign{'s' if len(row.applied_callsigns) > 1 else ''} " + f"Applied For: *{', '.join(row.applied_callsigns)}*") + embed.add_field(name=header, value=body, inline=False) + + if len(data.application_history) > 3: + embed.description += (f"\nRecords 1 to 3 of {len(data.application_history)}. " + f"See [ae7q.com]({data.query_url}) for more...") + + await ctx.send(embed=embed) + + @_ae7q_lookup.command(name="pending_apps", aliases=["pa"], category=cmn.cat.lookup) + async def _ae7q_pending_applications(self, ctx: commands.Context, query: str): + """Looks up the pending applications for a callsign, FRN, or Licensee ID on [ae7q.com](http://ae7q.com/).""" + with ctx.typing(): + query = query.upper() + + # LID + if re.match(r"L\d+", query): + data = ae7qparser.get_licensee_id(query) + # FRN + elif re.match(r"\d{10}", query): + data = ae7qparser.get_frn(query) + # callsign + else: + data = ae7qparser.get_call(query) + + embed = cmn.embed_factory(ctx) + embed.title = f"AE7Q Pending Applications for {query}" + embed.url = data.query_url + embed.colour = cmn.colours.good + embed.description = "" + + if not data.pending_applications: + embed.colour = cmn.colours.bad + embed.description += f"\nNo records found for `{query}`" + else: + # add the first three rows of the pending apps to the embed + if isinstance(data.pending_applications, ae7qparser.PendingApplicationsPredictionsTable): + for row in data.pending_applications[0:3]: + header = f"**{row.uls_file_number}** ({row.receipt_date:%Y-%m-%d})" + body = (f"Callsign: *{row.applicant_callsign}*\n" + f"Region: *{row.region_state}*\n" + f"Operator Class: *{row.operator_class}*\n" + f"Vanity Type: *{row.vanity_type}*\n") + if row.process_date: + body += f"Processes On: *{row.process_date:%Y-%m-%d}*\n" + body += (f"Sequential #: *{row.sequential_number}*\n" + f"Vanity Callsign: *{row.vanity_callsign}*\n" + f"Prediction: *{row.prediction}*") + embed.add_field(name=header, value=body, inline=False) + + elif isinstance(data.pending_applications, ae7qparser.CallsignPendingApplicationsPredictionsTable): + for row in data.pending_applications[0:3]: + header = f"**{row.uls_file_number}** ({row.receipt_date:%Y-%m-%d})" + body = (f"Callsign: *{row.applicant_callsign}*\n" + f"Region: *{row.region_state}*\n" + f"Operator Class: *{row.operator_class}*\n" + f"Vanity Type: *{row.vanity_type}*\n") + if row.process_date: + body += f"Processes On: *{row.process_date:%Y-%m-%d}*\n" + body += (f"Sequential #: *{row.sequential_number}*\n" + f"Prediction: *{row.prediction}*") + embed.add_field(name=header, value=body, inline=False) + + if len(data.pending_applications) > 3: + embed.description += (f"\nRecords 1 to 3 of {len(data.pending_applications)}. " + f"See [ae7q.com]({data.query_url}) for more...") + + await ctx.send(embed=embed) + + @_ae7q_lookup.command(name="app_detail", aliases=["ad"], category=cmn.cat.lookup) + async def _ae7q_app_detail(self, ctx: commands.Context, ufn: str): + """Looks up the application data for a ULS file number on [ae7q.com](http://ae7q.com/).""" + with ctx.typing(): + app_data = ae7qparser.get_application(ufn) + + embed = cmn.embed_factory(ctx) + embed.title = f"AE7Q Application Data for {ufn}" + embed.url = app_data.query_url + embed.colour = cmn.colours.good + + if not app_data.application_data: + embed.colour = cmn.colours.bad + embed.description += f"\nNo records found for `{ufn}`" + else: + if app_data.frn: + embed.add_field(name="FRN", value=app_data.frn, inline=True) + if app_data.licensee_id: + embed.add_field(name="Licensee ID", value=app_data.licensee_id, inline=True) + if app_data.applicant_type: + embed.add_field(name="Applicant Type", value=app_data.applicant_type, inline=True) + if app_data.entity_type: + embed.add_field(name="Entity Type", value=app_data.entity_type, inline=True) + if app_data.entity_name: + embed.add_field(name="Name", value=app_data.entity_name, inline=True) + if app_data.callsign: + embed.add_field(name="Callsign", value=app_data.callsign, inline=True) + + address = f"ATTN: {app_data.attention}\n" if app_data.attention else "" + address += f"{app_data.street_address}\n" if app_data.street_address else "" + address += f"{app_data.po_box}\n" if app_data.po_box else "" + address += (f"{', '.join([app_data.locality, app_data.state.split('-')[0].strip()])}" + f" {app_data.postal_code}") + if address: + embed.add_field(name="Address", value=address, inline=True) + if app_data.county: + embed.add_field(name="County", value=app_data.county, inline=True) + if app_data.maidenhead: + embed.add_field(name="Grid Square", value=app_data.maidenhead, inline=True) + if app_data.uls_geo_region: + embed.add_field(name="Region", value=app_data.uls_geo_region, inline=True) + if app_data.last_action_date: + embed.add_field(name="Last Action", value=f"{app_data.last_action_date:%Y-%m-%d}", inline=True) + if app_data.receipt_date: + embed.add_field(name="Receipt Date", value=f"{app_data.receipt_date:%Y-%m-%d}", inline=True) + if app_data.entered_timestamp: + embed.add_field(name="Entered Time", value=f"{app_data.entered_timestamp:%Y-%m-%d}", inline=True) + if app_data.application_source: + embed.add_field(name="Application Source", value=app_data.application_source, inline=True) + if app_data.application_purpose: + embed.add_field(name="Purpose", value=app_data.application_purpose, inline=True) + if app_data.result: + embed.add_field(name="Result", value=app_data.result, inline=True) + if app_data.fee_control_number: + embed.add_field(name="Fee Control Number", value=app_data.fee_control_number, inline=True) + if app_data.payment_date: + embed.add_field(name="Payment Date", value=f"{app_data.payment_date:%Y-%m-%d}", inline=True) + if app_data.operator_class: + embed.add_field(name="Operator Class", value=app_data.operator_class, inline=True) + if app_data.operator_group: + embed.add_field(name="Operator Group", value=app_data.operator_group, inline=True) + if app_data.uls_group: + embed.add_field(name="ULS Group", value=app_data.uls_group, inline=True) + if app_data.vanity_type: + embed.add_field(name="Vanity Type", value=app_data.vanity_type, inline=True) + if app_data.vanity_relationship: + embed.add_field(name="Vanity Relationship", value=app_data.vanity_relationship, inline=True) + if app_data.trustee_name and app_data.trustee_callsign: + embed.add_field(name="Trustee", + value=f"{app_data.trustee_name} ({app_data.trustee_callsign})", + inline=True) + + await ctx.send(embed=embed) @_ae7q_lookup.command(name="frn", aliases=["f"], category=cmn.cat.lookup) async def _ae7q_frn(self, ctx: commands.Context, frn: str): """Looks up the history of an FRN on [ae7q.com](http://ae7q.com/).""" - """ - NOTES: - - 2 tables: callsign history and application history - - If not found: no tables - """ with ctx.typing(): - base_url = "http://ae7q.com/query/data/FrnHistory.php?FRN=" - embed = cmn.embed_factory(ctx) + frn = frn.upper() - async with self.session.get(base_url + frn) as resp: - if resp.status != 200: - raise cmn.BotHTTPError(resp) - page = await resp.text() - - soup = BeautifulSoup(page, features="html.parser") - tables = [[row for row in table.find_all("tr")] for table in soup.select("table.Database")] - - if not len(tables): - embed.title = f"AE7Q History for FRN {frn}" - embed.colour = cmn.colours.bad - embed.url = base_url + frn - embed.description = f"No records found for FRN `{frn}`" - await ctx.send(embed=embed) - return - - table = tables[0] - - table_headers = table[0].find_all("th") - first_header = "".join(table_headers[0].strings) if len(table_headers) > 0 else None - - # catch if the wrong table was selected - if first_header is None or not first_header.startswith("With Licensee"): - embed.title = f"AE7Q History for FRN {frn}" - embed.colour = cmn.colours.bad - embed.url = base_url + frn - embed.description = f"No records found for FRN `{frn}`" - await ctx.send(embed=embed) - return - - table = await process_table(table[2:]) + frn_data = ae7qparser.get_frn(frn) embed = cmn.embed_factory(ctx) - embed.title = f"AE7Q History for FRN {frn}" + embed.title = f"AE7Q FRN History for {frn}" + embed.url = frn_data.query_url embed.colour = cmn.colours.good - embed.url = base_url + frn + embed.description = "" - # add the first three rows of the table to the embed - for row in table[0:3]: - header = f"**{row[0]}** ({row[3]})" # **Callsign** (Applicant Type) - body = (f"Name: *{row[2]}*\n" - f"Class: *{row[4]}*\n" - f"Region: *{row[1]}*\n" - f"Status: *{row[5]}*\n" - f"Granted: *{row[6]}*\n" - f"Effective: *{row[7]}*\n" - f"Cancelled: *{row[8]}*\n" - f"Expires: *{row[9]}*") - embed.add_field(name=header, value=body, inline=False) + if not frn_data.frn_history: + embed.colour = cmn.colours.bad + embed.description += f"\nNo records found for `{frn}`" + else: + # add the first three rows of the FRN history to the embed + for row in frn_data.frn_history[0:3]: + header = f"**{row.callsign}** ({row.applicant_type})" + body = (f"Name: *{row.entity_name}*\n" + f"Region: *{row.region_state}*\n" + f"Operator Class: *{row.operator_class}*\n" + f"Status: *{row.license_status}*\n") + if row.grant_date: + body += f"Granted: *{row.grant_date:%Y-%m-%d}*\n" + if row.effective_date: + body += f"Effective: *{row.effective_date:%Y-%m-%d}*\n" + if row.cancel_date: + body += f"Cancelled: *{row.cancel_date:%Y-%m-%d}*\n" + if row.expire_date: + body += f"Expires: *{row.expire_date:%Y-%m-%d}*\n" + embed.add_field(name=header, value=body, inline=False) - if len(table) > 3: - embed.description = f"Records 1 to 3 of {len(table)}. See ae7q.com for more..." + if len(frn_data.frn_history) > 3: + embed.description += (f"\nRecords 1 to 3 of {len(frn_data.frn_history)}. " + f"See [ae7q.com]({frn_data.query_url}) for more...") await ctx.send(embed=embed) @@ -310,87 +415,43 @@ class AE7QCog(commands.Cog): """Looks up the history of a licensee ID on [ae7q.com](http://ae7q.com/).""" with ctx.typing(): licensee_id = licensee_id.upper() - base_url = "http://ae7q.com/query/data/LicenseeIdHistory.php?ID=" - embed = cmn.embed_factory(ctx) - async with self.session.get(base_url + licensee_id) as resp: - if resp.status != 200: - raise cmn.BotHTTPError(resp) - page = await resp.text() - - soup = BeautifulSoup(page, features="html.parser") - tables = [[row for row in table.find_all("tr")] for table in soup.select("table.Database")] - - if not len(tables): - embed.title = f"AE7Q History for Licensee {licensee_id}" - embed.colour = cmn.colours.bad - embed.url = base_url + licensee_id - embed.description = f"No records found for Licensee `{licensee_id}`" - await ctx.send(embed=embed) - return - - table = tables[0] - - table_headers = table[0].find_all("th") - first_header = "".join(table_headers[0].strings) if len(table_headers) > 0 else None - - # catch if the wrong table was selected - if first_header is None or not first_header.startswith("With FCC"): - embed.title = f"AE7Q History for Licensee {licensee_id}" - embed.colour = cmn.colours.bad - embed.url = base_url + licensee_id - embed.description = f"No records found for Licensee `{licensee_id}`" - await ctx.send(embed=embed) - return - - table = await process_table(table[2:]) + lid_data = ae7qparser.get_licensee_id(licensee_id) embed = cmn.embed_factory(ctx) embed.title = f"AE7Q History for Licensee {licensee_id}" + embed.url = lid_data.query_url embed.colour = cmn.colours.good - embed.url = base_url + licensee_id + embed.description = "" - # add the first three rows of the table to the embed - for row in table[0:3]: - header = f"**{row[0]}** ({row[3]})" # **Callsign** (Applicant Type) - body = (f"Name: *{row[2]}*\n" - f"Class: *{row[4]}*\n" - f"Region: *{row[1]}*\n" - f"Status: *{row[5]}*\n" - f"Granted: *{row[6]}*\n" - f"Effective: *{row[7]}*\n" - f"Cancelled: *{row[8]}*\n" - f"Expires: *{row[9]}*") - embed.add_field(name=header, value=body, inline=False) + if not lid_data.licensee_id_history: + embed.colour = cmn.colours.bad + embed.description += f"\nNo records found for `{licensee_id}`" + else: + # add the first three rows of the FRN history to the embed + for row in lid_data.licensee_id_history[0:3]: + header = f"**{row.callsign}** ({row.applicant_type})" + body = (f"Name: *{row.entity_name}*\n" + f"Region: *{row.region_state}*\n" + f"Operator Class: *{row.operator_class}*\n" + f"Status: *{row.license_status}*\n") + if row.grant_date: + body += f"Granted: *{row.grant_date:%Y-%m-%d}*\n" + if row.effective_date: + body += f"Effective: *{row.effective_date:%Y-%m-%d}*\n" + if row.cancel_date: + body += f"Cancelled: *{row.cancel_date:%Y-%m-%d}*\n" + if row.expire_date: + body += f"Expires: *{row.expire_date:%Y-%m-%d}*\n" - if len(table) > 3: - embed.description = f"Records 1 to 3 of {len(table)}. See ae7q.com for more..." + embed.add_field(name=header, value=body, inline=False) + + if len(lid_data.licensee_id_history) > 3: + embed.description += (f"\nRecords 1 to 3 of {len(lid_data.licensee_id_history)}. " + f"See [ae7q.com]({lid_data.query_url}) for more...") await ctx.send(embed=embed) -async def process_table(table: list): - """Processes tables (*not* including headers) and returns the processed table""" - table_contents = [] - for tr in table: - row = [] - for td in tr.find_all("td"): - cell_val = td.getText().strip() - row.append(cell_val if cell_val else "-") - - # take care of columns that span multiple rows by copying the contents rightward - if "colspan" in td.attrs and int(td.attrs["colspan"]) > 1: - for i in range(int(td.attrs["colspan"]) - 1): - row.append(row[-1]) - - # get rid of ditto marks by copying the contents from the previous row - for i, cell in enumerate(row): - if cell == "\"": - row[i] = table_contents[-1][i] - # add row to table - table_contents += [row] - return table_contents - - def setup(bot: commands.Bot): bot.add_cog(AE7QCog(bot)) diff --git a/requirements.txt b/requirements.txt index 21f069a..a65e2e5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,3 +3,4 @@ ctyparser beautifulsoup4 lxml pytz +ae7qparser