import abc import collections #import codearchiver.modules # In get_module_class import codearchiver.version import logging import queue import requests import time import typing logger = logging.getLogger(__name__) class InputURL: def __init__(self, url): self._url = url self._response = None @property def url(self): return self._url @property def content(self): if self._response is None: self._response = HttpClient().get(self.url) return self._response.text class Result(typing.NamedTuple): '''Container for the result of a module''' id: str '''A unique ID for this result''' files: typing.List[str] = [] '''List of filenames produced by the run''' submodules: typing.List['Module'] = [] '''List of related submodules that need to be run as well''' class HttpError(Exception): pass class HttpClient: defaultRetries: int = 3 defaultUserAgent: str = f'codearchiver/{codearchiver.version.__version__}' def __init__(self, retries = None, userAgent = None): self._session = requests.Session() self._retries = retries if retries else self.defaultRetries self._userAgent = userAgent if userAgent else self.defaultUserAgent def request(self, method, url, params = None, data = None, headers = None, timeout = 10, responseOkCallback = None): mergedHeaders = {'User-Agent': self._userAgent} if headers: mergedHeaders.update(headers) headers = mergedHeaders for attempt in range(self._retries + 1): # The request is newly prepared on each retry because of potential cookie updates. req = self._session.prepare_request(requests.Request(method, url, params = params, data = data, headers = headers)) logger.info(f'Retrieving {req.url}') logger.debug(f'... with headers: {headers!r}') if data: logger.debug(f'... with data: {data!r}') try: r = self._session.send(req, timeout = timeout) except requests.exceptions.RequestException as exc: if attempt < self._retries: retrying = ', retrying' level = logging.WARNING else: retrying = '' level = logging.ERROR logger.log(level, f'Error retrieving {req.url}: {exc!r}{retrying}') else: if responseOkCallback is not None: success, msg = responseOkCallback(r) else: success, msg = (True, None) msg = f': {msg}' if msg else '' if success: logger.debug(f'{req.url} retrieved successfully{msg}') return r else: if attempt < self._retries: retrying = ', retrying' level = logging.WARNING else: retrying = '' level = logging.ERROR logger.log(level, f'Error retrieving {req.url}{msg}{retrying}') if attempt < self._retries: sleepTime = 1.0 * 2**attempt # exponential backoff: sleep 1 second after first attempt, 2 after second, 4 after third, etc. logger.info(f'Waiting {sleepTime:.0f} seconds') time.sleep(sleepTime) else: msg = f'{self._retries + 1} requests to {req.url} failed, giving up.' logger.fatal(msg) raise HttpError(msg) raise RuntimeError('Reached unreachable code') def get(self, *args, **kwargs): return self.request('GET', *args, **kwargs) def post(self, *args, **kwargs): return self.request('POST', *args, **kwargs) class Module: '''An abstract base class for a module.''' @staticmethod def matches(inputUrl: InputURL) -> bool: '''Whether or not this module is for handling `inputUrl`.''' return False def __init__(self, inputUrl): self._inputUrl = inputUrl self._httpClient = HttpClient() @abc.abstractmethod def process(self) -> Result: '''Perform the relevant retrieval(s)''' def get_module_class(inputUrl: InputURL) -> typing.Type['Module']: '''Get the Module class most suitable for handling `inputUrl`.''' # Ensure that modules are imported # This can't be done at the top because the modules need to refer back to the Module class. import codearchiver.modules # Collect all the Module subclasses and their inheritance level modules = {} # Module -> level:int q = queue.Queue() q.put_nowait((Module, 0)) while not q.empty(): class_, level = q.get_nowait() for c in class_.__subclasses__(): logger.debug(f'Found module {c.__module__}.{c.__name__} at level {level + 1}') modules[c] = level + 1 # Implicitly only keeps the highest level, i.e. deepest inheritance q.put_nowait((c, level + 1)) # Restructure into level->[modules] mapping levels = collections.defaultdict(list) for class_, level in modules.items(): levels[level].append(class_) # Process in descending level order for level in reversed(levels): matches = [class_ for class_ in levels[level] if class_.matches(inputUrl)] if len(matches) >= 2: logger.warning('Multiple matching modules for input URL, using the first found') logger.debug(f'Matching modules at level {level}: {matches!r}') logger.debug(f'Modules: {levels!r}') if matches: logger.info(f'Selecting module {matches[0].__module__}.{matches[0].__name__}') return matches[0] def get_module_instance(inputUrl: InputURL, **kwargs) -> 'Module': '''Get an instance of the Module class most suitable for handling `inputUrl`.''' return get_module_class(inputUrl)(inputUrl, **kwargs)