mirror of
https://github.com/miaowware/qrm2.git
synced 2024-11-22 15:58:40 -05:00
Merge pull request #162 from classabbyamp/ae7q-refactor
refactor ae7q call, add other ae7q query types
This commit is contained in:
commit
ce107f5a82
@ -7,6 +7,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
|
||||
## [Unreleased]
|
||||
### Added
|
||||
- Added Trustee field to qrz command for club callsigns.
|
||||
- Added alias for `ae7q call` command (`ae7q c`).
|
||||
- Added ae7q lookup by FRN and Licensee ID, and for trustee records (`ae7q frn, licensee, trustee`).
|
||||
### Changed
|
||||
- Changelog command to accept a version as argument.
|
||||
- The qrz command can now link to a QRZ page instead of embedding the data with the `--link` flag.
|
||||
|
441
exts/ae7q.py
441
exts/ae7q.py
@ -10,9 +10,8 @@ Test callsigns:
|
||||
KN8U: active, restricted
|
||||
AB2EE: expired, restricted
|
||||
KE8FGB: assigned once, no restrictions
|
||||
NA2AAA: unassigned, no records
|
||||
KC4USA: reserved but has call history
|
||||
WF4EMA: "
|
||||
KV4AAA: unassigned, no records
|
||||
KC4USA: reserved, no call history, *but* has application history
|
||||
"""
|
||||
|
||||
import discord.ext.commands as commands
|
||||
@ -34,110 +33,380 @@ class AE7QCog(commands.Cog):
|
||||
if ctx.invoked_subcommand is None:
|
||||
await ctx.send_help(ctx.command)
|
||||
|
||||
@_ae7q_lookup.command(name="call", category=cmn.cat.lookup)
|
||||
@_ae7q_lookup.command(name="call", aliases=["c"], category=cmn.cat.lookup)
|
||||
async def _ae7q_call(self, ctx: commands.Context, callsign: str):
|
||||
'''Look up the history for a callsign on [ae7q.com](http://ae7q.com/).'''
|
||||
callsign = callsign.upper()
|
||||
desc = ''
|
||||
base_url = "http://ae7q.com/query/data/CallHistory.php?CALL="
|
||||
embed = cmn.embed_factory(ctx)
|
||||
'''Look up the history of a callsign on [ae7q.com](http://ae7q.com/).'''
|
||||
with ctx.typing():
|
||||
callsign = callsign.upper()
|
||||
desc = ''
|
||||
base_url = "http://ae7q.com/query/data/CallHistory.php?CALL="
|
||||
embed = cmn.embed_factory(ctx)
|
||||
|
||||
async with self.session.get(base_url + callsign) as resp:
|
||||
if resp.status != 200:
|
||||
embed.title = "Error in AE7Q call command"
|
||||
embed.description = 'Could not load AE7Q'
|
||||
embed.colour = cmn.colours.bad
|
||||
await ctx.send(embed=embed)
|
||||
return
|
||||
page = await resp.text()
|
||||
async with self.session.get(base_url + callsign) as resp:
|
||||
if resp.status != 200:
|
||||
embed.title = "Error in AE7Q call command"
|
||||
embed.description = 'Could not load AE7Q'
|
||||
embed.colour = cmn.colours.bad
|
||||
await ctx.send(embed=embed)
|
||||
return
|
||||
page = await resp.text()
|
||||
|
||||
soup = BeautifulSoup(page, features="html.parser")
|
||||
tables = soup.select("table.Database")
|
||||
soup = BeautifulSoup(page, features="html.parser")
|
||||
tables = [[row for row in table.find_all("tr")] for table in soup.select("table.Database")]
|
||||
|
||||
for table in tables:
|
||||
rows = table.find_all("tr")
|
||||
if len(rows) > 1 and len(rows[0]) > 1:
|
||||
break
|
||||
if desc == '':
|
||||
for row in rows:
|
||||
table = tables[0]
|
||||
|
||||
# find the first table in the page, and use it to make a description
|
||||
if len(table[0]) == 1:
|
||||
for row in table:
|
||||
desc += " ".join(row.getText().split())
|
||||
desc += '\n'
|
||||
desc = desc.replace(callsign, f'`{callsign}`')
|
||||
rows = None
|
||||
table = tables[1]
|
||||
|
||||
first_header = ''.join(rows[0].find_all("th")[0].strings)
|
||||
table_headers = table[0].find_all("th")
|
||||
first_header = ''.join(table_headers[0].strings) if len(table_headers) > 0 else None
|
||||
|
||||
if rows is None or first_header != 'Entity Name':
|
||||
# catch if the wrong table was selected
|
||||
if first_header is None or first_header != 'Entity Name':
|
||||
embed.title = f"AE7Q History for {callsign}"
|
||||
embed.colour = cmn.colours.bad
|
||||
embed.url = base_url + callsign
|
||||
embed.description = desc
|
||||
embed.description += f'\nNo records found for `{callsign}`'
|
||||
await ctx.send(embed=embed)
|
||||
return
|
||||
|
||||
table = await process_table(table[1:])
|
||||
|
||||
embed = cmn.embed_factory(ctx)
|
||||
embed.title = f"AE7Q History for {callsign}"
|
||||
embed.colour = cmn.colours.bad
|
||||
embed.url = f"{base_url}{callsign}"
|
||||
embed.colour = cmn.colours.good
|
||||
embed.url = base_url + callsign
|
||||
|
||||
# add the first three rows of the table to the embed
|
||||
for row in table[0:3]:
|
||||
header = f'**{row[0]}** ({row[1]})' # **Name** (Applicant Type)
|
||||
body = (f'Class: *{row[2]}*\n'
|
||||
f'Region: *{row[3]}*\n'
|
||||
f'Status: *{row[4]}*\n'
|
||||
f'Granted: *{row[5]}*\n'
|
||||
f'Effective: *{row[6]}*\n'
|
||||
f'Cancelled: *{row[7]}*\n'
|
||||
f'Expires: *{row[8]}*')
|
||||
embed.add_field(name=header, value=body, inline=False)
|
||||
|
||||
if len(table) > 3:
|
||||
desc += f'\nRecords 1 to 3 of {len(table)}. See ae7q.com for more...'
|
||||
|
||||
embed.description = desc
|
||||
embed.description += f'\nNo records found for `{callsign}`'
|
||||
|
||||
await ctx.send(embed=embed)
|
||||
return
|
||||
|
||||
table_contents = [] # store your table here
|
||||
for tr in rows:
|
||||
if rows.index(tr) == 0:
|
||||
# first_header = ''.join(tr.find_all("th")[0].strings)
|
||||
# if first_header == 'Entity Name':
|
||||
# print('yooooo')
|
||||
continue
|
||||
row_cells = []
|
||||
for td in tr.find_all('td'):
|
||||
if td.getText().strip() != '':
|
||||
row_cells.append(td.getText().strip())
|
||||
else:
|
||||
row_cells.append('-')
|
||||
if 'colspan' in td.attrs and int(td.attrs['colspan']) > 1:
|
||||
for i in range(int(td.attrs['colspan']) - 1):
|
||||
row_cells.append(row_cells[-1])
|
||||
for i, cell in enumerate(row_cells):
|
||||
if cell == '"':
|
||||
row_cells[i] = table_contents[-1][i]
|
||||
if len(row_cells) > 1:
|
||||
table_contents += [row_cells]
|
||||
@_ae7q_lookup.command(name="trustee", aliases=["t"], category=cmn.cat.lookup)
|
||||
async def _ae7q_trustee(self, ctx: commands.Context, callsign: str):
|
||||
'''Look up the licenses for which a licensee is trustee on [ae7q.com](http://ae7q.com/).'''
|
||||
with ctx.typing():
|
||||
callsign = callsign.upper()
|
||||
desc = ''
|
||||
base_url = "http://ae7q.com/query/data/CallHistory.php?CALL="
|
||||
embed = cmn.embed_factory(ctx)
|
||||
|
||||
embed = cmn.embed_factory(ctx)
|
||||
embed.title = f"AE7Q Records for {callsign}"
|
||||
embed.colour = cmn.colours.good
|
||||
embed.url = f"{base_url}{callsign}"
|
||||
async with self.session.get(base_url + callsign) as resp:
|
||||
if resp.status != 200:
|
||||
embed.title = "Error in AE7Q trustee command"
|
||||
embed.description = 'Could not load AE7Q'
|
||||
embed.colour = cmn.colours.bad
|
||||
await ctx.send(embed=embed)
|
||||
return
|
||||
page = await resp.text()
|
||||
|
||||
for row in table_contents[0:3]:
|
||||
header = f'**{row[0]}** ({row[1]})'
|
||||
body = (f'Class: *{row[2]}*\n'
|
||||
f'Region: *{row[3]}*\n'
|
||||
f'Status: *{row[4]}*\n'
|
||||
f'Granted: *{row[5]}*\n'
|
||||
f'Effective: *{row[6]}*\n'
|
||||
f'Cancelled: *{row[7]}*\n'
|
||||
f'Expires: *{row[8]}*')
|
||||
embed.add_field(name=header, value=body, inline=False)
|
||||
soup = BeautifulSoup(page, features="html.parser")
|
||||
tables = [[row for row in table.find_all("tr")] for table in soup.select("table.Database")]
|
||||
|
||||
embed.description = desc
|
||||
if len(table_contents) > 3:
|
||||
embed.description += f'\nRecords 1 to 3 of {len(table_contents)}. See ae7q.com for more...'
|
||||
try:
|
||||
table = tables[2] if len(tables[0][0]) == 1 else tables[1]
|
||||
except IndexError:
|
||||
embed.title = f"AE7Q Trustee History for {callsign}"
|
||||
embed.colour = cmn.colours.bad
|
||||
embed.url = base_url + callsign
|
||||
embed.description = desc
|
||||
embed.description += f'\nNo records found for `{callsign}`'
|
||||
await ctx.send(embed=embed)
|
||||
return
|
||||
|
||||
await ctx.send(embed=embed)
|
||||
table_headers = table[0].find_all("th")
|
||||
first_header = ''.join(table_headers[0].strings) if len(table_headers) > 0 else None
|
||||
|
||||
# TODO: write commands for other AE7Q response types?
|
||||
# @_ae7q_lookup.command(name="trustee")
|
||||
# async def _ae7q_trustee(self, ctx: commands.Context, callsign: str):
|
||||
# pass
|
||||
# catch if the wrong table was selected
|
||||
if first_header is None or not first_header.startswith("With"):
|
||||
embed.title = f"AE7Q Trustee History for {callsign}"
|
||||
embed.colour = cmn.colours.bad
|
||||
embed.url = base_url + callsign
|
||||
embed.description = desc
|
||||
embed.description += f'\nNo records found for `{callsign}`'
|
||||
await ctx.send(embed=embed)
|
||||
return
|
||||
|
||||
# @_ae7q_lookup.command(name="applications", aliases=['apps'])
|
||||
# async def _ae7q_applications(self, ctx: commands.Context, callsign: str):
|
||||
# pass
|
||||
table = await process_table(table[2:])
|
||||
|
||||
# @_ae7q_lookup.command(name="frn")
|
||||
# async def _ae7q_frn(self, ctx: commands.Context, frn: str):
|
||||
# base_url = "http://ae7q.com/query/data/FrnHistory.php?FRN="
|
||||
# pass
|
||||
embed = cmn.embed_factory(ctx)
|
||||
embed.title = f"AE7Q Trustee History for {callsign}"
|
||||
embed.colour = cmn.colours.good
|
||||
embed.url = base_url + callsign
|
||||
|
||||
# @_ae7q_lookup.command(name="licensee", aliases=["lic"])
|
||||
# async def _ae7q_licensee(self, ctx: commands.Context, frn: str):
|
||||
# base_url = "http://ae7q.com/query/data/LicenseeIdHistory.php?ID="
|
||||
# pass
|
||||
# add the first three rows of the table to the embed
|
||||
for row in table[0:3]:
|
||||
header = f'**{row[0]}** ({row[3]})' # **Name** (Applicant Type)
|
||||
body = (f'Name: *{row[2]}*\n'
|
||||
f'Region: *{row[1]}*\n'
|
||||
f'Status: *{row[4]}*\n'
|
||||
f'Granted: *{row[5]}*\n'
|
||||
f'Effective: *{row[6]}*\n'
|
||||
f'Cancelled: *{row[7]}*\n'
|
||||
f'Expires: *{row[8]}*')
|
||||
embed.add_field(name=header, value=body, inline=False)
|
||||
|
||||
if len(table) > 3:
|
||||
desc += f'\nRecords 1 to 3 of {len(table)}. See ae7q.com for more...'
|
||||
|
||||
embed.description = desc
|
||||
|
||||
await ctx.send(embed=embed)
|
||||
|
||||
@_ae7q_lookup.command(name="applications", aliases=["a"], category=cmn.cat.lookup)
|
||||
async def _ae7q_applications(self, ctx: commands.Context, callsign: str):
|
||||
'''Look up the application history for a callsign on [ae7q.com](http://ae7q.com/).'''
|
||||
"""
|
||||
with ctx.typing():
|
||||
callsign = callsign.upper()
|
||||
desc = ''
|
||||
base_url = "http://ae7q.com/query/data/CallHistory.php?CALL="
|
||||
embed = cmn.embed_factory(ctx)
|
||||
|
||||
async with self.session.get(base_url + callsign) as resp:
|
||||
if resp.status != 200:
|
||||
embed.title = "Error in AE7Q applications command"
|
||||
embed.description = 'Could not load AE7Q'
|
||||
embed.colour = cmn.colours.bad
|
||||
await ctx.send(embed=embed)
|
||||
return
|
||||
page = await resp.text()
|
||||
|
||||
soup = BeautifulSoup(page, features="html.parser")
|
||||
tables = [[row for row in table.find_all("tr")] for table in soup.select("table.Database")]
|
||||
|
||||
table = tables[0]
|
||||
|
||||
# find the first table in the page, and use it to make a description
|
||||
if len(table[0]) == 1:
|
||||
for row in table:
|
||||
desc += " ".join(row.getText().split())
|
||||
desc += '\n'
|
||||
desc = desc.replace(callsign, f'`{callsign}`')
|
||||
|
||||
# select the last table to get applications
|
||||
table = tables[-1]
|
||||
|
||||
table_headers = table[0].find_all("th")
|
||||
first_header = ''.join(table_headers[0].strings) if len(table_headers) > 0 else None
|
||||
|
||||
# catch if the wrong table was selected
|
||||
if first_header is None or not first_header.startswith("Receipt"):
|
||||
embed.title = f"AE7Q Application History for {callsign}"
|
||||
embed.colour = cmn.colours.bad
|
||||
embed.url = base_url + callsign
|
||||
embed.description = desc
|
||||
embed.description += f'\nNo records found for `{callsign}`'
|
||||
await ctx.send(embed=embed)
|
||||
return
|
||||
|
||||
table = await process_table(table[1:])
|
||||
|
||||
embed = cmn.embed_factory(ctx)
|
||||
embed.title = f"AE7Q Application History for {callsign}"
|
||||
embed.colour = cmn.colours.good
|
||||
embed.url = base_url + callsign
|
||||
|
||||
# add the first three rows of the table to the embed
|
||||
for row in table[0:3]:
|
||||
header = f'**{row[1]}** ({row[3]})' # **Name** (Callsign)
|
||||
body = (f'Received: *{row[0]}*\n'
|
||||
f'Region: *{row[2]}*\n'
|
||||
f'Purpose: *{row[5]}*\n'
|
||||
f'Last Action: *{row[7]}*\n'
|
||||
f'Application Status: *{row[8]}*\n')
|
||||
embed.add_field(name=header, value=body, inline=False)
|
||||
|
||||
if len(table) > 3:
|
||||
desc += f'\nRecords 1 to 3 of {len(table)}. See ae7q.com for more...'
|
||||
|
||||
embed.description = desc
|
||||
|
||||
await ctx.send(embed=embed)
|
||||
"""
|
||||
pass
|
||||
|
||||
@_ae7q_lookup.command(name="frn", aliases=["f"], category=cmn.cat.lookup)
|
||||
async def _ae7q_frn(self, ctx: commands.Context, frn: str):
|
||||
'''Look up the history of an FRN on [ae7q.com](http://ae7q.com/).'''
|
||||
"""
|
||||
NOTES:
|
||||
- 2 tables: callsign history and application history
|
||||
- If not found: no tables
|
||||
"""
|
||||
with ctx.typing():
|
||||
base_url = "http://ae7q.com/query/data/FrnHistory.php?FRN="
|
||||
embed = cmn.embed_factory(ctx)
|
||||
|
||||
async with self.session.get(base_url + frn) as resp:
|
||||
if resp.status != 200:
|
||||
embed.title = "Error in AE7Q frn command"
|
||||
embed.description = 'Could not load AE7Q'
|
||||
embed.colour = cmn.colours.bad
|
||||
await ctx.send(embed=embed)
|
||||
return
|
||||
page = await resp.text()
|
||||
|
||||
soup = BeautifulSoup(page, features="html.parser")
|
||||
tables = [[row for row in table.find_all("tr")] for table in soup.select("table.Database")]
|
||||
|
||||
if not len(tables):
|
||||
embed.title = f"AE7Q History for FRN {frn}"
|
||||
embed.colour = cmn.colours.bad
|
||||
embed.url = base_url + frn
|
||||
embed.description = f'No records found for FRN `{frn}`'
|
||||
await ctx.send(embed=embed)
|
||||
return
|
||||
|
||||
table = tables[0]
|
||||
|
||||
table_headers = table[0].find_all("th")
|
||||
first_header = ''.join(table_headers[0].strings) if len(table_headers) > 0 else None
|
||||
|
||||
# catch if the wrong table was selected
|
||||
if first_header is None or not first_header.startswith('With Licensee'):
|
||||
embed.title = f"AE7Q History for FRN {frn}"
|
||||
embed.colour = cmn.colours.bad
|
||||
embed.url = base_url + frn
|
||||
embed.description = f'No records found for FRN `{frn}`'
|
||||
await ctx.send(embed=embed)
|
||||
return
|
||||
|
||||
table = await process_table(table[2:])
|
||||
|
||||
embed = cmn.embed_factory(ctx)
|
||||
embed.title = f"AE7Q History for FRN {frn}"
|
||||
embed.colour = cmn.colours.good
|
||||
embed.url = base_url + frn
|
||||
|
||||
# add the first three rows of the table to the embed
|
||||
for row in table[0:3]:
|
||||
header = f'**{row[0]}** ({row[3]})' # **Callsign** (Applicant Type)
|
||||
body = (f'Name: *{row[2]}*\n'
|
||||
f'Class: *{row[4]}*\n'
|
||||
f'Region: *{row[1]}*\n'
|
||||
f'Status: *{row[5]}*\n'
|
||||
f'Granted: *{row[6]}*\n'
|
||||
f'Effective: *{row[7]}*\n'
|
||||
f'Cancelled: *{row[8]}*\n'
|
||||
f'Expires: *{row[9]}*')
|
||||
embed.add_field(name=header, value=body, inline=False)
|
||||
|
||||
if len(table) > 3:
|
||||
embed.description = f'Records 1 to 3 of {len(table)}. See ae7q.com for more...'
|
||||
|
||||
await ctx.send(embed=embed)
|
||||
|
||||
@_ae7q_lookup.command(name="licensee", aliases=["l"], category=cmn.cat.lookup)
|
||||
async def _ae7q_licensee(self, ctx: commands.Context, licensee_id: str):
|
||||
'''Look up the history of a licensee ID on [ae7q.com](http://ae7q.com/).'''
|
||||
with ctx.typing():
|
||||
licensee_id = licensee_id.upper()
|
||||
base_url = "http://ae7q.com/query/data/LicenseeIdHistory.php?ID="
|
||||
embed = cmn.embed_factory(ctx)
|
||||
|
||||
async with self.session.get(base_url + licensee_id) as resp:
|
||||
if resp.status != 200:
|
||||
embed.title = "Error in AE7Q licensee command"
|
||||
embed.description = 'Could not load AE7Q'
|
||||
embed.colour = cmn.colours.bad
|
||||
await ctx.send(embed=embed)
|
||||
return
|
||||
page = await resp.text()
|
||||
|
||||
soup = BeautifulSoup(page, features="html.parser")
|
||||
tables = [[row for row in table.find_all("tr")] for table in soup.select("table.Database")]
|
||||
|
||||
if not len(tables):
|
||||
embed.title = f"AE7Q History for Licensee {licensee_id}"
|
||||
embed.colour = cmn.colours.bad
|
||||
embed.url = base_url + licensee_id
|
||||
embed.description = f'No records found for Licensee `{licensee_id}`'
|
||||
await ctx.send(embed=embed)
|
||||
return
|
||||
|
||||
table = tables[0]
|
||||
|
||||
table_headers = table[0].find_all("th")
|
||||
first_header = ''.join(table_headers[0].strings) if len(table_headers) > 0 else None
|
||||
|
||||
# catch if the wrong table was selected
|
||||
if first_header is None or not first_header.startswith('With FCC'):
|
||||
embed.title = f"AE7Q History for Licensee {licensee_id}"
|
||||
embed.colour = cmn.colours.bad
|
||||
embed.url = base_url + licensee_id
|
||||
embed.description = f'No records found for Licensee `{licensee_id}`'
|
||||
await ctx.send(embed=embed)
|
||||
return
|
||||
|
||||
table = await process_table(table[2:])
|
||||
|
||||
embed = cmn.embed_factory(ctx)
|
||||
embed.title = f"AE7Q History for Licensee {licensee_id}"
|
||||
embed.colour = cmn.colours.good
|
||||
embed.url = base_url + licensee_id
|
||||
|
||||
# add the first three rows of the table to the embed
|
||||
for row in table[0:3]:
|
||||
header = f'**{row[0]}** ({row[3]})' # **Callsign** (Applicant Type)
|
||||
body = (f'Name: *{row[2]}*\n'
|
||||
f'Class: *{row[4]}*\n'
|
||||
f'Region: *{row[1]}*\n'
|
||||
f'Status: *{row[5]}*\n'
|
||||
f'Granted: *{row[6]}*\n'
|
||||
f'Effective: *{row[7]}*\n'
|
||||
f'Cancelled: *{row[8]}*\n'
|
||||
f'Expires: *{row[9]}*')
|
||||
embed.add_field(name=header, value=body, inline=False)
|
||||
|
||||
if len(table) > 3:
|
||||
embed.description = f'Records 1 to 3 of {len(table)}. See ae7q.com for more...'
|
||||
|
||||
await ctx.send(embed=embed)
|
||||
|
||||
|
||||
async def process_table(table: list):
|
||||
"""Processes tables (*not* including headers) and returns the processed table"""
|
||||
table_contents = []
|
||||
for tr in table:
|
||||
row = []
|
||||
for td in tr.find_all('td'):
|
||||
cell_val = td.getText().strip()
|
||||
row.append(cell_val if cell_val else '-')
|
||||
|
||||
# take care of columns that span multiple rows by copying the contents rightward
|
||||
if 'colspan' in td.attrs and int(td.attrs['colspan']) > 1:
|
||||
for i in range(int(td.attrs['colspan']) - 1):
|
||||
row.append(row[-1])
|
||||
|
||||
# get rid of ditto marks by copying the contents from the previous row
|
||||
for i, cell in enumerate(row):
|
||||
if cell == "\"":
|
||||
row[i] = table_contents[-1][i]
|
||||
# add row to table
|
||||
table_contents += [row]
|
||||
return table_contents
|
||||
|
||||
|
||||
def setup(bot: commands.Bot):
|
||||
|
Loading…
Reference in New Issue
Block a user