From 1b73693b377fa48d88b5980a09482d3a838370c1 Mon Sep 17 00:00:00 2001 From: JustAnotherArchivist Date: Fri, 26 Jun 2020 22:12:58 +0000 Subject: [PATCH] Keep track of and handle errors in modules via metaclass --- codearchiver/core.py | 86 ++++++++++++++++++++++++++++++-------------- 1 file changed, 59 insertions(+), 27 deletions(-) diff --git a/codearchiver/core.py b/codearchiver/core.py index 6437583..5b18684 100644 --- a/codearchiver/core.py +++ b/codearchiver/core.py @@ -8,6 +8,7 @@ import queue import requests import time import typing +import weakref logger = logging.getLogger(__name__) @@ -125,11 +126,65 @@ class HttpClient: return self.request('POST', *args, **kwargs) -class Module: +class ModuleMeta(type): + __modulesByName = {} # name -> Module class + + def __new__(cls, *args, **kwargs): + class_ = super().__new__(cls, *args, **kwargs) + if class_.name is not None: + if class_.name.strip('abcdefghijklmnopqrstuvwxyz_-') != '': + raise RuntimeError(f'Invalid class name: {class_.name!r}') + if class_.name in cls.__modulesByName: + raise RuntimeError(f'Class name collision: {class_.name!r} is already known') + cls.__modulesByName[class_.name] = weakref.ref(class_) + logger.info(f'Found {class_.name!r} module {class_.__module__}.{class_.__name__}') + else: + logger.info(f'Found nameless module {class_.__module__}.{class_.__name__}') + return class_ + + @classmethod + def get_module_by_name(cls, name): + if classRef := cls.__modulesByName.get(name): + class_ = classRef() + if class_ is None: + logger.info(f'Module {name!r} is gone, dropping') + del cls.__modulesByName[name] + return class_ + + @classmethod + def iter_modules(cls): + # Housekeeping first: remove dead modules + for name in list(cls.__modulesByName): # create a copy of the names list so the dict can be modified in the loop + if cls.__modulesByName[name]() is None: + logger.info(f'Module {name!r} is gone, dropping') + del cls.__modulesByName[name] + + for name, classRef in cls.__modulesByName.items(): + class_ = classRef() + if class_ is None: + # Module class no longer exists, skip + # Even though dead modules are removed above, it's possible that the code consuming this iterator drops/deletes modules. + continue + yield class_ + + @classmethod + def drop(cls, module): + if module.name is not None and module.name in cls.__modulesByName: + del cls.__modulesByName[module.name] + logger.info(f'Module {module.name!r} dropped') + + def __del__(self, *args, **kwargs): + if self.name is not None and self.name in type(self).__modulesByName: + logger.info(f'Module {self.name!r} is being destroyed, dropping') + del type(self).__modulesByName[self.name] + # type has no __del__ method, no need to call it. + + +class Module(metaclass = ModuleMeta): '''An abstract base class for a module.''' name: typing.Optional[str] = None - '''The name of the module. Modules without a name are ignored, and names must be unique.''' + '''The name of the module. Modules without a name are ignored. Names must be unique and may only contain a-z, underscores, and hyphens.''' @staticmethod def matches(inputUrl: InputURL) -> bool: @@ -157,39 +212,16 @@ def get_module_class(inputUrl: InputURL) -> typing.Type[Module]: # This can't be done at the top because the modules need to refer back to the Module class. import codearchiver.modules - # Collect all the Module subclasses and names - modules = set() - modulesByName = {} # name: str -> List[Module] - q = queue.Queue() - q.put_nowait(Module) - while not q.empty(): - class_ = q.get_nowait() - for c in class_.__subclasses__(): - if c.name is not None: - logger.debug(f'Found {c.name!r} module {c.__module__}.{c.__name__}') - modules.add(c) - if c.name not in modulesByName: - modulesByName[c.name] = [] - modulesByName[c.name].append(c) - else: - logger.debug(f'Found nameless module {c.__module__}.{c.__name__}') - q.put_nowait(c) - - # Verify that there are no module name collisions - if any(len(x) > 1 for x in modulesByName.values()): - raise RuntimeError(f'Found multiple modules with the same name') - # Check if the URL references one of the modules directly if inputUrl.moduleScheme: - if inputUrl.moduleScheme in modulesByName: - module = modulesByName[inputUrl.moduleScheme][0] + if module := ModuleMeta.get_module_by_name(inputUrl.moduleScheme): logger.info(f'Selecting module {module.__module__}.{module.__name__}') return module else: raise RuntimeError(f'No module with name {inputUrl.moduleScheme!r} exists') # Check if exactly one of the modules matches - matches = [class_ for class_ in modules if class_.matches(inputUrl)] + matches = [class_ for class_ in ModuleMeta.iter_modules() if class_.matches(inputUrl)] if len(matches) >= 2: logger.error('Multiple matching modules for input URL') logger.debug(f'Matching modules: {matches!r}')