Source code for pydov.util.caching

# -*- coding: utf-8 -*-
"""Module implementing a local cache for downloaded XML files."""
import datetime
import gzip
import os
import re
import shutil
import tempfile
import warnings

from pydov.util.dovutil import build_dov_url, get_dov_xml
from pydov.util.errors import RemoteFetchError, XmlStaleWarning
from pydov.util.hooks import HookRunner


[docs] class AbstractCache(object): """Abstract base class for caching of downloaded XML files from DOV. Attributes ---------- stale_on_error : bool, default to True Whether to return stale responses from the cache in case of a network error prevents downloading a fresh copy. """ def __init__(self): """Initialisation.""" self.stale_on_error = True def _get_remote(self, url, session=None): """Get the XML data by requesting it from the given URL. Parameters ---------- url : str Permanent URL to a DOV object. session : requests.Session Session to use to perform HTTP requests for data. Defaults to None, which means a new session will be created for each request. Returns ------- xml : bytes The raw XML data of this DOV object as bytes. """ xml = get_dov_xml(url, session) HookRunner.execute_xml_downloaded(url.rstrip('.xml')) return xml def _emit_cache_hit(self, url): """Emit the XML cache hit event for all registered hooks. This notifies hooks that a valid XML document has been returned from the cache. Parameters ---------- url : str Permanent URL to a DOV object. """ HookRunner.execute_xml_cache_hit(url.rstrip('.xml')) def _emit_stale_hit(self, url): """Emit the XML stale hit event for all registered hooks. This notifies hooks that a stale XML document has been returned from the cache. Parameters ---------- url : str Permanent URL to a DOV object. """ HookRunner.execute_xml_stale_hit(url.rstrip('.xml'))
[docs] def get(self, url, session=None): """Get the XML data for the DOV object referenced by the given URL. Because of parallel processing, this method will be called simultaneously from multiple threads. Make sure your implementation is threadsafe or uses locking. If a valid version exists in the cache, it will be loaded and returned. If no valid version exists, the XML will be downloaded from the DOV webservice, saved in the cache and returned. Parameters ---------- url : str Permanent URL to a DOV object. session : requests.Session Session to use to perform HTTP requests for data. Defaults to None, which means a new session will be created for each request. Returns ------- xml : bytes The raw XML data of this DOV object as bytes. """ raise NotImplementedError('This should be implemented in a subclass.')
[docs] def clean(self): """Clean the cache by removing old records from the cache. Since during normal use the cache only grows by adding new objects and overwriting existing ones with a new version, you can use this function to clean the cache. It will remove all records older than the maximum age from the cache. """ raise NotImplementedError('This should be implemented in a subclass.')
[docs] def remove(self): """Remove the entire cache.""" raise NotImplementedError('This should be implemented in a subclass.')
[docs] class AbstractFileCache(AbstractCache): """Abstract class for filebased caching of downloaded XML files from DOV.""" def __init__(self, max_age=datetime.timedelta(weeks=2), cachedir=None): """Initialisation. Set up the instance variables and create the cache directory if it does not exists already. Parameters ---------- max_age : datetime.timedelta, optional The maximum age of a cached XML file to be valid. If the last modification date of the file is before this time, it will be redownloaded. Defaults to two weeks. cachedir : str, optional Path of the directory that will be used to save the cached XML files. Be sure to use a directory that will only be used for this PyDOV cache. Default to a temporary directory provided by the operating system. """ super().__init__() if cachedir: self.cachedir = cachedir else: self.cachedir = os.path.join(tempfile.gettempdir(), 'pydov') self.max_age = max_age self._re_type_key = re.compile( build_dov_url('data/') + r'([^ /]+)/([^.]+)' ) try: if not os.path.exists(self.cachedir): os.makedirs(self.cachedir) except Exception: pass def _get_filepath(self, datatype, key): """Get the location on disk where the object with given datatype and key is to be saved. Parameters ---------- datatype : str Datatype of the DOV object. key : str Unique and permanent object key of the DOV object. Returns ------- str Full absolute path on disk where the object is to be saved. """ raise NotImplementedError('This should be implemented in a subclass.') def _get_type_key_from_url(self, url): """Parse a DOV permalink and return the datatype and object key. Parameters ---------- url : str Permanent URL to a DOV object. Returns ------- datatype : str Datatype of the DOV object referred to by the URL. key : str Unique and permanent key of the instance of the DOV object referred to by the URL. """ datatype = self._re_type_key.search(url) if datatype and len(datatype.groups()) > 1: return datatype.group(1), datatype.group(2) def _get_type_key_from_path(self, path): """Parse a filepath and return the datatype and object key. Parameters ---------- path : str Full, absolute, path to a cached file. Returns ------- datatype : str Datatype of the DOV object referred to by the URL. key : str Unique and permanent key of the instance of the DOV object referred to by the URL. """ raise NotImplementedError('This should be implemented in a subclass.') def _is_valid(self, datatype, key): """Check if a valid version of the given DOV object exists in the cache. A cached version is valid if it exists and the last modification time of the file is after the maximum age defined on initialisation. Parameters ---------- datatype : str Datatype of the DOV object. key : str Unique and permanent object key of the DOV object. Returns ------- bool True if a valid cached version exists, False otherwise. """ filepath = self._get_filepath(datatype, key) if not os.path.exists(filepath): return False last_modification = datetime.datetime.fromtimestamp( os.path.getmtime(filepath)) now = datetime.datetime.now() if (now - last_modification) > self.max_age: return False else: return True def _is_stale(self, datatype, key): """Check if a stale version of the given DOV object exists in the cache. A cached version is stale if it exists and the last modification time of the file is before the maximum age defined on initialisation. Parameters ---------- datatype : str Datatype of the DOV object. key : str Unique and permanent object key of the DOV object. Returns ------- bool True if a stale cached version exists, False otherwise. """ if self._is_valid(datatype, key): return False filepath = self._get_filepath(datatype, key) return os.path.exists(filepath) def _load(self, datatype, key): """Read a cached version from disk. datatype : str Datatype of the DOV object. key : str Unique and permanent object key of the DOV object. Returns ------- str (xml) XML string of the DOV object, loaded from the cache. """ raise NotImplementedError('This should be implemented in a subclass.') def _save(self, datatype, key, content): """Save the given content in the cache. Parameters ---------- datatype : str Datatype of the DOV object to save. key : str Unique and permanent object key of the DOV object to save. content : bytes The raw XML data of this DOV object as bytes. """ raise NotImplementedError('This should be implemented in a subclass.')
[docs] def get(self, url, session=None): datatype, key = self._get_type_key_from_url(url) data = HookRunner.execute_inject_xml_response(url) if data is not None: HookRunner.execute_xml_received(url, data) return data if self._is_valid(datatype, key): try: self._emit_cache_hit(url) data = self._load(datatype, key).encode('utf-8') HookRunner.execute_xml_received(url, data) return data except Exception: pass try: data = self._get_remote(url, session) except RemoteFetchError: if self.stale_on_error and self._is_stale(datatype, key): self._emit_stale_hit(url) warnings.warn(( "Failed to fetch remote XML document for " "object '{}', using older stale version from cache. " "Resulting dataframe will be out-of-date.".format(url)), XmlStaleWarning) data = self._load(datatype, key).encode('utf-8') return data else: HookRunner.execute_xml_fetch_error(url) raise RemoteFetchError else: try: self._save(datatype, key, data) except Exception: pass return data
[docs] def clean(self): """Clean the cache by removing old records from the cache. Since during normal use the cache only grows by adding new objects and overwriting existing ones with a new version, you can use this function to clean the cache. It will remove all records older than the maximum age from the cache. Note that this method is currently not called anywhere in the code, but it is provided as reference. """ if os.path.exists(self.cachedir): for type in os.listdir(self.cachedir): for object in os.listdir(os.path.join(self.cachedir, type)): datatype, key = self._get_type_key_from_path( os.path.join(self.cachedir, type, object)) if not self._is_valid(datatype, key): os.remove( os.path.join(self.cachedir, datatype, object))
[docs] def remove(self): """Remove the entire cache directory. Note that the default directory to save the cache is a temporary location provided by the operating system, and as a subsequence the OS will normally take care of its removal. Note that this method is currently not called anywhere in the code, but it is provided as reference. """ if os.path.exists(self.cachedir): shutil.rmtree(self.cachedir)
[docs] class PlainTextFileCache(AbstractFileCache): """Class for plain text caching of downloaded XML files from DOV.""" def _get_filepath(self, datatype, key): return os.path.join(self.cachedir, datatype, key + '.xml') def _get_type_key_from_path(self, path): key = os.path.basename(path).rstrip('.xml') datatype = os.path.dirname(path).split()[-1] return datatype, key def _save(self, datatype, key, content): filepath = self._get_filepath(datatype, key) folder = os.path.dirname(filepath) if not os.path.exists(folder): os.makedirs(folder) with open(filepath, 'w', encoding='utf-8') as f: f.write(content.decode('utf-8')) def _load(self, datatype, key): filepath = self._get_filepath(datatype, key) with open(filepath, 'r', encoding='utf-8') as f: return f.read()
[docs] class GzipTextFileCache(AbstractFileCache): """Class for GZipped text caching of downloaded XML files from DOV.""" def _get_filepath(self, datatype, key): return os.path.join(self.cachedir, datatype, key + '.xml.gz') def _get_type_key_from_path(self, path): key = os.path.basename(path).rstrip('.xml.gz') datatype = os.path.dirname(path).split()[-1] return datatype, key def _save(self, datatype, key, content): filepath = self._get_filepath(datatype, key) folder = os.path.dirname(filepath) if not os.path.exists(folder): os.makedirs(folder) with gzip.open(filepath, 'wb') as f: f.write(content) def _load(self, datatype, key): filepath = self._get_filepath(datatype, key) with gzip.open(filepath, 'rb') as f: return f.read().decode('utf-8')