Clean up search engine

Notable changes:
1. Prevent excessive engine module imports.
2. Replace trivial usage of `join()`.
3. Keep the output text sorted whenever possible.
4. Close handles properly.
5. Print error to stderr, not stdout.
6. Report search job exit code.
7. Print exception message to stderr if exception was thrown when
   running a search job.
8. Utilize XML library to build XML data
   And use 2 spaces as indentation.

PR #21098.
This commit is contained in:
Chocobo1
2024-07-22 16:51:57 +08:00
committed by GitHub
parent 3c5baac150
commit 69a829dfb0
4 changed files with 168 additions and 198 deletions

View File

@@ -1,4 +1,4 @@
#VERSION: 1.46
#VERSION: 1.47
# Author:
# Fabien Devaux <fab AT gnux DOT info>
@@ -36,13 +36,15 @@
import importlib
import pathlib
import sys
import traceback
import urllib.parse
from collections.abc import Iterable, Iterator, Sequence
import xml.etree.ElementTree as ET
from collections.abc import Iterable
from enum import Enum
from glob import glob
from multiprocessing import Pool, cpu_count
from os import path
from typing import Dict, List, Optional, Set, Tuple, Type
from typing import Optional
THREADED: bool = True
try:
@@ -50,7 +52,7 @@ try:
except NotImplementedError:
MAX_THREADS = 1
Category = Enum('Category', ['all', 'movies', 'tv', 'music', 'games', 'anime', 'software', 'pictures', 'books'])
Category = Enum('Category', ['all', 'anime', 'books', 'games', 'movies', 'music', 'pictures', 'software', 'tv'])
################################################################################
@@ -62,13 +64,13 @@ Category = Enum('Category', ['all', 'movies', 'tv', 'music', 'games', 'anime', '
################################################################################
EngineName = str
EngineModuleName = str # the filename of the engine plugin
class Engine:
url: str
name: EngineName
supported_categories: Dict[str, str]
name: str
supported_categories: dict[str, str]
def __init__(self) -> None:
pass
@@ -81,112 +83,89 @@ class Engine:
# global state
engine_dict: Dict[EngineName, Optional[Type[Engine]]] = {}
engine_dict: dict[EngineModuleName, Optional[type[Engine]]] = {}
def list_engines() -> List[EngineName]:
def list_engines() -> list[EngineModuleName]:
""" List all engines,
including broken engines that fail on import
including broken engines that would fail on import
Faster than initialize_engines
Return list of all engines
Return list of all engines' module name
"""
found_engines = []
names = []
for engine_path in glob(path.join(path.dirname(__file__), 'engines', '*.py')):
engine_name = path.basename(engine_path).split('.')[0].strip()
if len(engine_name) == 0 or engine_name.startswith('_'):
engine_module_name = path.basename(engine_path).split('.')[0].strip()
if len(engine_module_name) == 0 or engine_module_name.startswith('_'):
continue
found_engines.append(engine_name)
names.append(engine_module_name)
return found_engines
return sorted(names)
def get_engine(engine_name: EngineName) -> Optional[Type[Engine]]:
if engine_name in engine_dict:
return engine_dict[engine_name]
def import_engine(engine_module_name: EngineModuleName) -> Optional[type[Engine]]:
if engine_module_name in engine_dict:
return engine_dict[engine_module_name]
# when import fails, engine is None
engine = None
# when import fails, return `None`
engine_class = None
try:
# import engines.[engine]
engine_module = importlib.import_module("engines." + engine_name)
engine = getattr(engine_module, engine_name)
# import engines.[engine_module_name]
engine_module = importlib.import_module(f"engines.{engine_module_name}")
engine_class = getattr(engine_module, engine_module_name)
except Exception:
pass
engine_dict[engine_name] = engine
return engine
engine_dict[engine_module_name] = engine_class
return engine_class
def initialize_engines(found_engines: Iterable[EngineName]) -> Set[EngineName]:
""" Import available engines
Return set of available engines
def get_capabilities(engines: Iterable[EngineModuleName]) -> str:
"""
supported_engines = set()
for engine_name in found_engines:
# import engine
engine = get_engine(engine_name)
if engine is None:
continue
supported_engines.add(engine_name)
return supported_engines
def engines_to_xml(supported_engines: Iterable[EngineName]) -> Iterator[str]:
""" Generates xml for supported engines """
tab = " " * 4
for engine_name in supported_engines:
search_engine = get_engine(engine_name)
if search_engine is None:
continue
supported_categories = ""
if hasattr(search_engine, "supported_categories"):
supported_categories = " ".join((key
for key in search_engine.supported_categories.keys()
if key != Category.all.name))
yield "".join((tab, "<", engine_name, ">\n",
tab, tab, "<name>", search_engine.name, "</name>\n",
tab, tab, "<url>", search_engine.url, "</url>\n",
tab, tab, "<categories>", supported_categories, "</categories>\n",
tab, "</", engine_name, ">\n"))
def displayCapabilities(supported_engines: Iterable[EngineName]) -> None:
"""
Display capabilities in XML format
Return capabilities in XML format
<capabilities>
<engine_short_name>
<engine_module_name>
<name>long name</name>
<url>http://example.com</url>
<categories>movies music games</categories>
</engine_short_name>
</engine_module_name>
</capabilities>
"""
xml = "".join(("<capabilities>\n",
"".join(engines_to_xml(supported_engines)),
"</capabilities>"))
print(xml)
capabilities_element = ET.Element('capabilities')
for engine_module_name in engines:
engine_class = import_engine(engine_module_name)
if engine_class is None:
continue
engine_module_element = ET.SubElement(capabilities_element, engine_module_name)
ET.SubElement(engine_module_element, 'name').text = engine_class.name
ET.SubElement(engine_module_element, 'url').text = engine_class.url
supported_categories = ""
if hasattr(engine_class, "supported_categories"):
supported_categories = " ".join((key
for key in sorted(engine_class.supported_categories.keys())
if key != Category.all.name))
ET.SubElement(engine_module_element, 'categories').text = supported_categories
ET.indent(capabilities_element)
return ET.tostring(capabilities_element, 'unicode')
def run_search(engine_list: Tuple[Optional[Type[Engine]], str, Category]) -> bool:
def run_search(search_params: tuple[type[Engine], str, Category]) -> bool:
""" Run search in engine
@param engine_list Tuple with engine, query and category
@param search_params Tuple with engine, query and category
@retval False if any exceptions occurred
@retval True otherwise
"""
engine_class, what, cat = engine_list
if engine_class is None:
return False
engine_class, what, cat = search_params
try:
engine = engine_class()
# avoid exceptions due to invalid category
@@ -195,73 +174,65 @@ def run_search(engine_list: Tuple[Optional[Type[Engine]], str, Category]) -> boo
engine.search(what, cat.name)
else:
engine.search(what)
return True
except Exception:
traceback.print_exc()
return False
def main(args: Sequence[str]) -> None:
# qbt tend to run this script in 'isolate mode' so append the current path manually
current_path = str(pathlib.Path(__file__).parent.resolve())
if current_path not in sys.path:
sys.path.append(current_path)
found_engines = list_engines()
def show_usage() -> None:
print("./nova2.py all|engine1[,engine2]* <category> <keywords>", file=sys.stderr)
print("found engines: " + ','.join(found_engines), file=sys.stderr)
print("to list available engines: ./nova2.py --capabilities [--names]", file=sys.stderr)
if not args:
show_usage()
sys.exit(1)
elif args[0] == "--capabilities":
supported_engines = initialize_engines(found_engines)
if "--names" in args:
print(",".join(supported_engines))
return
displayCapabilities(supported_engines)
return
elif len(args) < 3:
show_usage()
sys.exit(1)
cat = args[1].lower()
try:
category = Category[cat]
except KeyError:
print(" - ".join(('Invalid category', cat)), file=sys.stderr)
sys.exit(1)
# get only unique engines with set
engines_list = set(e.lower() for e in args[0].strip().split(','))
if not engines_list:
# engine list is empty. Nothing to do here
return
if 'all' in engines_list:
# use all supported engines
# note: this can be slower than passing a list of supported engines
# because initialize_engines will also try to import not-supported engines
engines_list = initialize_engines(found_engines)
else:
# discard not-found engines
engines_list = {engine for engine in engines_list if engine in found_engines}
what = urllib.parse.quote(' '.join(args[2:]))
params = ((get_engine(engine_name), what, category) for engine_name in engines_list)
if THREADED:
# child process spawning is controlled min(number of searches, number of cpu)
with Pool(min(len(engines_list), MAX_THREADS)) as pool:
pool.map(run_search, params)
else:
# py3 note: map is needed to be evaluated for content to be executed
all(map(run_search, params))
if __name__ == "__main__":
main(sys.argv[1:])
def main() -> int:
# qbt tend to run this script in 'isolate mode' so append the current path manually
current_path = str(pathlib.Path(__file__).parent.resolve())
if current_path not in sys.path:
sys.path.append(current_path)
# https://docs.python.org/3/library/sys.html#sys.exit
class ExitCode(Enum):
OK = 0
AppError = 1
ArgError = 2
found_engines = list_engines()
prog_name = sys.argv[0]
prog_usage = (f"Usage: {prog_name} all|engine1[,engine2]* <category> <keywords>\n"
f"To list available engines: {prog_name} --capabilities [--names]\n"
f"Found engines: {','.join(found_engines)}")
if "--capabilities" in sys.argv:
if "--names" in sys.argv:
print(",".join((e for e in found_engines if import_engine(e) is not None)))
return ExitCode.OK.value
print(get_capabilities(found_engines))
return ExitCode.OK.value
elif len(sys.argv) < 4:
print(prog_usage, file=sys.stderr)
return ExitCode.ArgError.value
# get unique engines
engs = set(arg.strip().lower() for arg in sys.argv[1].split(','))
engines = found_engines if 'all' in engs else [e for e in found_engines if e in engs]
cat = sys.argv[2].lower()
try:
category = Category[cat]
except KeyError:
print(f"Invalid category: {cat}", file=sys.stderr)
return ExitCode.ArgError.value
what = urllib.parse.quote(' '.join(sys.argv[3:]))
params = ((engine_class, what, category) for e in engines if (engine_class := import_engine(e)) is not None)
search_success = False
if THREADED:
processes = max(min(len(engines), MAX_THREADS), 1)
with Pool(processes) as pool:
search_success = all(pool.map(run_search, params))
else:
search_success = all(map(run_search, params))
return ExitCode.OK.value if search_success else ExitCode.AppError.value
sys.exit(main())