|
- import abc
- import collections
- #import codearchiver.modules # In get_module_class
- import codearchiver.version
- import dataclasses
- import logging
- import queue
- import requests
- import time
- import typing
-
-
- logger = logging.getLogger(__name__)
-
-
- class InputURL:
- def __init__(self, url):
- self._url = url
- self._response = None
-
- @property
- def url(self):
- return self._url
-
- @property
- def content(self):
- if self._response is None:
- self._response = HttpClient().get(self.url)
- return self._response.text
-
-
- @dataclasses.dataclass
- class Result:
- '''Container for the result of a module'''
-
- id: str
- '''A unique ID for this result'''
-
- files: typing.List[str] = dataclasses.field(default_factory = list)
- '''List of filenames produced by the run'''
-
- submoduleResults: typing.List[typing.Tuple['Module', 'Result']] = dataclasses.field(default_factory = list)
- '''List of related submodules and their results'''
-
-
- class HttpError(Exception):
- pass
-
-
- class HttpClient:
- defaultRetries: int = 3
- defaultUserAgent: str = f'codearchiver/{codearchiver.version.__version__}'
-
- def __init__(self, retries = None, userAgent = None):
- self._session = requests.Session()
- self._retries = retries if retries else self.defaultRetries
- self._userAgent = userAgent if userAgent else self.defaultUserAgent
-
- def request(self, method, url, params = None, data = None, headers = None, timeout = 10, responseOkCallback = None):
- mergedHeaders = {'User-Agent': self._userAgent}
- if headers:
- mergedHeaders.update(headers)
- headers = mergedHeaders
- for attempt in range(self._retries + 1):
- # The request is newly prepared on each retry because of potential cookie updates.
- req = self._session.prepare_request(requests.Request(method, url, params = params, data = data, headers = headers))
- logger.info(f'Retrieving {req.url}')
- logger.debug(f'... with headers: {headers!r}')
- if data:
- logger.debug(f'... with data: {data!r}')
- try:
- r = self._session.send(req, timeout = timeout)
- except requests.exceptions.RequestException as exc:
- if attempt < self._retries:
- retrying = ', retrying'
- level = logging.WARNING
- else:
- retrying = ''
- level = logging.ERROR
- logger.log(level, f'Error retrieving {req.url}: {exc!r}{retrying}')
- else:
- if responseOkCallback is not None:
- success, msg = responseOkCallback(r)
- else:
- success, msg = (True, None)
- msg = f': {msg}' if msg else ''
-
- if success:
- logger.debug(f'{req.url} retrieved successfully{msg}')
- return r
- else:
- if attempt < self._retries:
- retrying = ', retrying'
- level = logging.WARNING
- else:
- retrying = ''
- level = logging.ERROR
- logger.log(level, f'Error retrieving {req.url}{msg}{retrying}')
- if attempt < self._retries:
- sleepTime = 1.0 * 2**attempt # exponential backoff: sleep 1 second after first attempt, 2 after second, 4 after third, etc.
- logger.info(f'Waiting {sleepTime:.0f} seconds')
- time.sleep(sleepTime)
- else:
- msg = f'{self._retries + 1} requests to {req.url} failed, giving up.'
- logger.fatal(msg)
- raise HttpError(msg)
- raise RuntimeError('Reached unreachable code')
-
- def get(self, *args, **kwargs):
- return self.request('GET', *args, **kwargs)
-
- def post(self, *args, **kwargs):
- return self.request('POST', *args, **kwargs)
-
-
- class Module:
- '''An abstract base class for a module.'''
-
- @staticmethod
- def matches(inputUrl: InputURL) -> bool:
- '''Whether or not this module is for handling `inputUrl`.'''
- return False
-
- def __init__(self, inputUrl):
- self._inputUrl = inputUrl
- self._url = inputUrl.url
- self._httpClient = HttpClient()
-
- @abc.abstractmethod
- def process(self) -> Result:
- '''Perform the relevant retrieval(s)'''
-
-
- def get_module_class(inputUrl: InputURL) -> typing.Type['Module']:
- '''Get the Module class most suitable for handling `inputUrl`.'''
-
- # Ensure that modules are imported
- # This can't be done at the top because the modules need to refer back to the Module class.
- import codearchiver.modules
-
- # Collect all the Module subclasses and their inheritance level
- modules = {} # Module -> level:int
- q = queue.Queue()
- q.put_nowait((Module, 0))
- while not q.empty():
- class_, level = q.get_nowait()
- for c in class_.__subclasses__():
- logger.debug(f'Found module {c.__module__}.{c.__name__} at level {level + 1}')
- modules[c] = level + 1 # Implicitly only keeps the highest level, i.e. deepest inheritance
- q.put_nowait((c, level + 1))
-
- # Restructure into level->[modules] mapping
- levels = collections.defaultdict(list)
- for class_, level in modules.items():
- levels[level].append(class_)
-
- # Process in descending level order
- for level in reversed(levels):
- matches = [class_ for class_ in levels[level] if class_.matches(inputUrl)]
- if len(matches) >= 2:
- logger.warning('Multiple matching modules for input URL, using the first found')
- logger.debug(f'Matching modules at level {level}: {matches!r}')
- logger.debug(f'Modules: {levels!r}')
- if matches:
- logger.info(f'Selecting module {matches[0].__module__}.{matches[0].__name__}')
- return matches[0]
-
-
- def get_module_instance(inputUrl: InputURL, **kwargs) -> 'Module':
- '''Get an instance of the Module class most suitable for handling `inputUrl`.'''
- return get_module_class(inputUrl)(inputUrl, **kwargs)
|