mirror of
https://github.com/miaowware/qrm2.git
synced 2024-11-29 11:18:51 -05:00
89 lines
3.3 KiB
Python
89 lines
3.3 KiB
Python
"""
|
|
Resources manager for qrm2.
|
|
---
|
|
Copyright (C) 2021-2023 classabbyamp, 0x5c
|
|
|
|
SPDX-License-Identifier: LiLiQ-Rplus-1.1
|
|
"""
|
|
|
|
|
|
from pathlib import Path
|
|
|
|
import httpx
|
|
|
|
from utils.resources_models import Index
|
|
|
|
|
|
class ResourcesManager:
|
|
def __init__(self, basedir: Path, url: str, versions: dict):
|
|
self.basedir = basedir
|
|
self.url = url
|
|
self.versions = versions
|
|
self.index: Index = self.sync_start(basedir)
|
|
|
|
def parse_index(self, index: str):
|
|
"""Parses the index."""
|
|
return Index.model_validate_json(index)
|
|
|
|
def sync_fetch(self, filepath: str):
|
|
"""Fetches files in sync mode."""
|
|
self.print_msg(f"Fetching {filepath}", "sync")
|
|
resp = httpx.get(self.url + filepath)
|
|
resp.raise_for_status()
|
|
r = resp.content
|
|
resp.close()
|
|
return r
|
|
|
|
def sync_start(self, basedir: Path) -> Index:
|
|
"""Takes cares of constructing the local resources repository and initialising the RM."""
|
|
self.print_msg("Initialising ResourceManager", "sync")
|
|
self.ensure_dir(basedir)
|
|
try:
|
|
raw = self.sync_fetch("index.json")
|
|
new_index: Index = self.parse_index(raw)
|
|
with (basedir / "index.json").open("wb") as file:
|
|
file.write(raw)
|
|
except (httpx.RequestError, OSError) as ex:
|
|
self.print_msg(f"There was an issue fetching the index: {ex.__class__.__name__}: {ex}", "sync")
|
|
if (basedir / "index.json").exists():
|
|
self.print_msg("Old file exist, using old resources", "fallback")
|
|
with (basedir / "index.json").open("r") as file:
|
|
old_index = self.parse_index(file.read())
|
|
for res, ver in self.versions.items():
|
|
for file in old_index[res][ver]:
|
|
if not (basedir / file.filename).exists():
|
|
self.print_msg(f"Error: {file.filename} is missing", "fallback")
|
|
raise SystemExit(1)
|
|
return old_index
|
|
raise SystemExit(1)
|
|
for res, ver in self.versions.items():
|
|
for file in new_index[res][ver]:
|
|
try:
|
|
with (basedir / file.filename).open("wb") as f:
|
|
f.write(self.sync_fetch(file.filename))
|
|
except (httpx.RequestError, OSError) as ex:
|
|
ex_cls = ex.__class__.__name__
|
|
self.print_msg(f"There was an issue fetching {file.filename}: {ex_cls}: {ex}", "sync")
|
|
if not (basedir / file.filename).exists():
|
|
raise SystemExit(1)
|
|
self.print_msg("Old file exists, using it", "fallback")
|
|
return new_index
|
|
|
|
def ensure_dir(self, basedir: Path) -> bool:
|
|
"""Ensures that the resources/ directory is present. Creates as necessary."""
|
|
if basedir.is_file():
|
|
raise FileExistsError(f"'{basedir}' is not a directory!")
|
|
if not basedir.exists():
|
|
self.print_msg("Creating resources directory")
|
|
basedir.mkdir()
|
|
return True
|
|
return False
|
|
|
|
def print_msg(self, msg: str, mode: str = None):
|
|
"""Formats and prints messages for the resources manager."""
|
|
message = "RM: "
|
|
message += msg
|
|
if mode:
|
|
message += f" ({mode})"
|
|
print(message)
|