import abc import collections #import codearchiver.modules # In get_module_class import codearchiver.version import dataclasses import logging import queue import requests import time import typing import weakref logger = logging.getLogger(__name__) class InputURL: def __init__(self, url): if 0 < url.find('+') < url.find('://'): # '+' and '://' appear in the URL in this order and there is at least one character each before the + as well as between the two self._moduleScheme, self._url = url.split('+', 1) else: self._moduleScheme = None self._url = url self._response = None @property def url(self): return self._url @property def moduleScheme(self): return self._moduleScheme @property def content(self): if self._response is None: self._response = HttpClient().get(self.url) return self._response.text def __repr__(self): return f'{type(self).__module__}.{type(self).__name__}({self._url!r})' @dataclasses.dataclass class Result: '''Container for the result of a module''' id: str '''A unique ID for this result''' files: typing.List[str] = dataclasses.field(default_factory = list) '''List of filenames produced by the run''' submoduleResults: typing.List[typing.Tuple['Module', 'Result']] = dataclasses.field(default_factory = list) '''List of related submodules and their results''' class HttpError(Exception): pass class HttpClient: defaultRetries: int = 3 defaultUserAgent: str = f'codearchiver/{codearchiver.version.__version__}' def __init__(self, retries = None, userAgent = None): self._session = requests.Session() self._retries = retries if retries else self.defaultRetries self._userAgent = userAgent if userAgent else self.defaultUserAgent def request(self, method, url, params = None, data = None, headers = None, timeout = 10, responseOkCallback = None): mergedHeaders = {'User-Agent': self._userAgent} if headers: mergedHeaders.update(headers) headers = mergedHeaders for attempt in range(self._retries + 1): # The request is newly prepared on each retry because of potential cookie updates. req = self._session.prepare_request(requests.Request(method, url, params = params, data = data, headers = headers)) logger.info(f'Retrieving {req.url}') logger.debug(f'... with headers: {headers!r}') if data: logger.debug(f'... with data: {data!r}') try: r = self._session.send(req, timeout = timeout) except requests.exceptions.RequestException as exc: if attempt < self._retries: retrying = ', retrying' level = logging.WARNING else: retrying = '' level = logging.ERROR logger.log(level, f'Error retrieving {req.url}: {exc!r}{retrying}') else: if responseOkCallback is not None: success, msg = responseOkCallback(r) else: success, msg = (True, None) msg = f': {msg}' if msg else '' if success: logger.debug(f'{req.url} retrieved successfully{msg}') return r else: if attempt < self._retries: retrying = ', retrying' level = logging.WARNING else: retrying = '' level = logging.ERROR logger.log(level, f'Error retrieving {req.url}{msg}{retrying}') if attempt < self._retries: sleepTime = 1.0 * 2**attempt # exponential backoff: sleep 1 second after first attempt, 2 after second, 4 after third, etc. logger.info(f'Waiting {sleepTime:.0f} seconds') time.sleep(sleepTime) else: msg = f'{self._retries + 1} requests to {req.url} failed, giving up.' logger.fatal(msg) raise HttpError(msg) raise RuntimeError('Reached unreachable code') def get(self, *args, **kwargs): return self.request('GET', *args, **kwargs) def post(self, *args, **kwargs): return self.request('POST', *args, **kwargs) class ModuleMeta(type): __modulesByName = {} # name -> Module class def __new__(cls, *args, **kwargs): class_ = super().__new__(cls, *args, **kwargs) if class_.name is not None: if class_.name.strip('abcdefghijklmnopqrstuvwxyz_-') != '': raise RuntimeError(f'Invalid class name: {class_.name!r}') if class_.name in cls.__modulesByName: raise RuntimeError(f'Class name collision: {class_.name!r} is already known') cls.__modulesByName[class_.name] = weakref.ref(class_) logger.info(f'Found {class_.name!r} module {class_.__module__}.{class_.__name__}') else: logger.info(f'Found nameless module {class_.__module__}.{class_.__name__}') return class_ @classmethod def get_module_by_name(cls, name): if classRef := cls.__modulesByName.get(name): class_ = classRef() if class_ is None: logger.info(f'Module {name!r} is gone, dropping') del cls.__modulesByName[name] return class_ @classmethod def iter_modules(cls): # Housekeeping first: remove dead modules for name in list(cls.__modulesByName): # create a copy of the names list so the dict can be modified in the loop if cls.__modulesByName[name]() is None: logger.info(f'Module {name!r} is gone, dropping') del cls.__modulesByName[name] for name, classRef in cls.__modulesByName.items(): class_ = classRef() if class_ is None: # Module class no longer exists, skip # Even though dead modules are removed above, it's possible that the code consuming this iterator drops/deletes modules. continue yield class_ @classmethod def drop(cls, module): if module.name is not None and module.name in cls.__modulesByName: del cls.__modulesByName[module.name] logger.info(f'Module {module.name!r} dropped') def __del__(self, *args, **kwargs): if self.name is not None and self.name in type(self).__modulesByName: logger.info(f'Module {self.name!r} is being destroyed, dropping') del type(self).__modulesByName[self.name] # type has no __del__ method, no need to call it. class Module(metaclass = ModuleMeta): '''An abstract base class for a module.''' name: typing.Optional[str] = None '''The name of the module. Modules without a name are ignored. Names must be unique and may only contain a-z, underscores, and hyphens.''' @staticmethod def matches(inputUrl: InputURL) -> bool: '''Whether or not this module is for handling `inputUrl`.''' return False def __init__(self, inputUrl, id_ = None): self._inputUrl = inputUrl self._url = inputUrl.url self._id = id_ self._httpClient = HttpClient() @abc.abstractmethod def process(self) -> Result: '''Perform the relevant retrieval(s)''' def __repr__(self): return f'{type(self).__module__}.{type(self).__name__}({self._inputUrl!r})' def get_module_class(inputUrl: InputURL) -> typing.Type[Module]: '''Get the Module class most suitable for handling `inputUrl`.''' # Ensure that modules are imported # This can't be done at the top because the modules need to refer back to the Module class. import codearchiver.modules # Check if the URL references one of the modules directly if inputUrl.moduleScheme: if module := ModuleMeta.get_module_by_name(inputUrl.moduleScheme): logger.info(f'Selecting module {module.__module__}.{module.__name__}') return module else: raise RuntimeError(f'No module with name {inputUrl.moduleScheme!r} exists') # Check if exactly one of the modules matches matches = [class_ for class_ in ModuleMeta.iter_modules() if class_.matches(inputUrl)] if len(matches) >= 2: logger.error('Multiple matching modules for input URL') logger.debug(f'Matching modules: {matches!r}') raise RuntimeError('Multiple matching modules for input URL') if matches: logger.info(f'Selecting module {matches[0].__module__}.{matches[0].__name__}') return matches[0] raise RuntimeError('No matching modules for input URL') def get_module_instance(inputUrl: InputURL, **kwargs) -> Module: '''Get an instance of the Module class most suitable for handling `inputUrl`.''' return get_module_class(inputUrl)(inputUrl, **kwargs)