# -*- coding: utf-8 -*-
"""Module containing the base DOV data types."""
import types
import warnings
from collections import OrderedDict
import numpy as np
from owslib.etree import etree
import pydov
from pydov.search.abstract import AbstractCommon
from pydov.types.fields import AbstractField, ReturnFieldList
from pydov.util import owsutil
from pydov.util.dovutil import get_dov_xml, parse_dov_xml
from pydov.util.errors import RemoteFetchError, XmlFetchWarning
from pydov.util.net import LocalSessionThreadPool
from ..util.errors import InvalidFieldError, XmlParseError, XmlParseWarning
from ..util.hooks import HookRunner
[docs]
class AbstractTypeCommon(AbstractCommon):
"""Class grouping methods common to AbstractDovType and
AbstractDovSubType.
Attributes
----------
fields : list of pydov.types.fields.AbstractField
List of fields of this type.
"""
fields = []
@classmethod
def _parse(cls, func, xpath, namespace, returntype):
"""Parse the result of an XML path function, stripping the namespace
and adding type conversion.
Parameters
----------
func : reference to function
Function to call.
xpath : str
XML path of the element, used as the argument of `func`.
namespace : str or None
Namespace to be added to each item in the `xpath`. None to use
the xpath as is.
returntype : str
Parse the text found with `func` to this output datatype. One of
`string`, `float`, `integer`, `date`, `datetime`, `boolean`.
Returns
-------
str or float or int or bool or datetime.date or datetime.datetime
Returns the parsed value of the output from calling `func` on
`xpath`, converted to the type described by `returntype`.
"""
if namespace is not None:
ns = '{{{}}}'.format(namespace)
text = func('./' + ns + ('/' + ns).join(xpath.split('/')))
else:
text = func('./' + xpath.lstrip('/'))
if text is None:
return np.nan
return cls._typeconvert(text, returntype)
[docs]
@classmethod
def extend_fields(cls, extra_fields):
"""Extend the fields of this type with given extra fields and return
the new fieldset.
Parameters
----------
extra_fields : list of pydov.types.fields.AbstractField
Extra fields to be appended to the existing fields of this type.
Returns
-------
list of pydov.types.fields.AbstractField
List of the existing fields of this type, extended with the
extra fields supplied in extra_fields.
"""
fields = list(cls.fields)
fields.extend(extra_fields)
return fields
[docs]
class AbstractDovSubType(AbstractTypeCommon):
"""Abstract DOV type grouping fields and methods common to all DOV
subtypes. Not to be instantiated or used directly.
Attributes
----------
rootpath : str
XPath expression of the root element of this subtype. Should return
all elements of this subtype.
Raises
------
RuntimeError
When the defined fields of this type are invalid.
"""
rootpath = None
_UNRESOLVED = "{UNRESOLVED}"
def __init__(self):
"""Initialisation.
Parameters
----------
name : str
The name associated with this subtype.
"""
for f in self.fields:
if not isinstance(f, AbstractField):
raise RuntimeError(
"Subtype '{}' fields should be instances of "
"pydov.types.fields.AbstractField, found {}.".format(
self.__class__.__name__, str(type(f))))
self.data = dict(
zip(self.get_field_names(),
[AbstractDovSubType._UNRESOLVED] * len(self.get_field_names()))
)
[docs]
@classmethod
def from_xml(cls, xml_data):
"""Build instances of this subtype from XML data.
Parameters
----------
xml_data : bytes
Raw XML data of the DOV object that contains information about
this subtype.
Yields
------
An instance of this type for each occurrence of the rootpath in
the XML document.
"""
try:
tree = parse_dov_xml(xml_data)
for element in tree.findall(cls.rootpath):
yield cls.from_xml_element(element)
except XmlParseError:
# Ignore XmlParseError here in subtypes, assuming it will be
# reported in the corresponding main type. We can make this
# assumption safely because both main and subtypes are in a
# single XML file.
pass
[docs]
@classmethod
def from_xml_element(cls, element):
"""Build an instance of this subtype from a single XML element.
Parameters
----------
element : etree.Element
XML element representing a single record of this subtype.
Returns
-------
instance of this class
An instance of this class based on the data in the XML element.
"""
instance = cls()
for field in cls.get_fields().values():
instance.data[field['name']] = instance._parse(
func=element.findtext,
xpath=field['sourcefield'],
namespace=None,
returntype=field.get('type', None)
)
return instance
[docs]
@classmethod
def get_field_names(cls):
"""Return the names of the fields available for this type.
Returns
-------
list<str>
List of the field names available for this type. These are also
the names of the columns in the output dataframe for this type.
"""
return [f['name'] for f in cls.fields]
[docs]
@classmethod
def get_fields(cls):
"""Return the metadata of the fields available for this type.
Returns
-------
collections.OrderedDict<str,dict>
Ordered dictionary mapping the field (column) name to the
dictionary containing the metadata of this field.
This metadata dictionary includes at least:
name (str)
The name of the field in the output data.
source (str)
The source of the field (either `wfs` or `xml`).
sourcefield (str)
The name of the field in the source (source + sourcefield
identify the origin of the data).
type (str)
Datatype of the output data field (one of `string`, `float`,
`integer`, `date`, `datetime`).
definition (str)
The definition of the field.
notnull (boolean)
Whether the field is mandatory (True) or can be null (False).
"""
return OrderedDict(
zip([f['name'] for f in cls.fields],
[f for f in cls.fields]))
[docs]
@classmethod
def get_name(cls):
"""Return the name associated with this subtype.
Returns
-------
name : str
The name associated with this subtype.
"""
return cls.__name__
[docs]
class AbstractDovType(AbstractTypeCommon):
"""Abstract DOV type grouping fields and methods common to all DOV
object types. Not to be instantiated or used directly.
Attributes
----------
subtypes : list of subclass of pydov.types.abstract.AbstractDovSubType
List of subtypes of this type.
"""
_UNRESOLVED = "{UNRESOLVED}"
subtypes = []
fields = []
pkey_fieldname = None
def __init__(self, typename, pkey):
"""Initialisation.
Parameters
----------
typename : str
Name of the DOV object type.
pkey : str
Permanent key of this DOV object, being a URI of the form
`https://www.dov.vlaanderen.be/data/typename/id`.
Raises
------
RuntimeError
When the defined fields of this type are invalid.
"""
if typename is None or pkey is None:
raise ValueError(
"Failed to instantiate object of class {} with typename '{}' "
"and permkey '{}'. Typename and pkey must not be None.".format(
self.__class__.__name__, typename, pkey))
self.typename = typename
self.pkey = pkey
for f in self.fields:
if not isinstance(f, AbstractField):
raise RuntimeError(
"Type '{}' fields should be instances of "
"pydov.types.fields.AbstractField, found {}.".format(
self.__class__.__name__, str(type(f))))
self.data = dict(
zip(self.get_field_names(include_subtypes=False),
[AbstractDovType._UNRESOLVED] * len(self.get_field_names()))
)
self.subdata = dict(
zip([st.get_name() for st in self.subtypes],
[] * len(self.subtypes))
)
self.data['pkey_{}'.format(self.typename)] = self.pkey
def _parse_xml_data(self, session=None):
"""Get remote XML data for this DOV object, parse the raw XML and
save the results in the data object.
Parameters
----------
session : requests.Session
Session to use to perform HTTP requests for data. Defaults to None,
which means a new session will be created for each request.
Returns
-------
success : boolean
Whether or not the XML data could be fetched and parsed.
"""
try:
xml = self._get_xml_data(session)
except RemoteFetchError:
warnings.warn(("Failed to fetch remote XML document for "
"object '{}'. Resulting dataframe will be "
"incomplete.".format(self.pkey)), XmlFetchWarning)
return False
try:
tree = parse_dov_xml(xml)
for field in self.get_fields(source=('xml',),
include_subtypes=False).values():
self.data[field['name']] = self._parse(
func=tree.findtext,
xpath=field['sourcefield'],
namespace=None,
returntype=field.get('type', None)
)
for field in self.get_fields(source=('custom_xml',),
include_subtypes=False).values():
self.data[field['name']] = field.calculate(
self.__class__, tree) or np.nan
self._parse_subtypes(xml)
return True
except XmlParseError:
warnings.warn(
("Failed to parse XML for object '{}'. Resulting "
"dataframe will be incomplete.").format(self.pkey),
XmlParseWarning)
return False
[docs]
@classmethod
def from_wfs_element(cls, feature, namespace):
"""Build an instance of this type from a WFS feature element.
Parameters
----------
feature : etree.Element
XML element representing a single record of the WFS layer.
namespace : str
Namespace associated with this WFS featuretype.
Returns
-------
cls
An instance of this class populated with the data from the WFS
element.
"""
if cls.pkey_fieldname is not None:
pkey = feature.findtext(
'./{{{}}}{}'.format(namespace, cls.pkey_fieldname))
else:
pkey = feature.get('{http://www.opengis.net/gml/3.2}id')
instance = cls(pkey)
for field in cls.get_fields(source=('wfs',)).values():
if owsutil.has_geom_support() and field['type'] == 'geometry':
instance.data[field['name']] = cls._parse(
func=feature.find,
xpath=field['sourcefield'],
namespace=namespace,
returntype='geometry'
)
else:
instance.data[field['name']] = cls._parse(
func=feature.findtext,
xpath=field['sourcefield'],
namespace=namespace,
returntype=field.get('type', str)
)
for field in cls.get_fields(source=('custom_wfs',)).values():
for required_field in field.requires_wfs_fields():
instance.data[required_field] = cls._parse(
func=feature.findtext,
xpath=required_field,
namespace=namespace,
returntype=field.get('type', str)
)
for field in cls.get_fields(source=('custom_wfs',)).values():
instance.data[field['name']] = field.calculate(instance) or np.nan
return instance
[docs]
@classmethod
def from_wfs(cls, response, namespace):
"""Build instances of this type from a WFS response.
Parameters
----------
response : str or bytes or etree.Element or iterable<etree.Element>
WFS response containing GML features.
Can either be a GML `str` or `byte` sequence, in which case it
will be parsed and scanned for `wfs20:member`.
Can also be a single instance of `etree.Element` containing the
parsed GML response.
It can also be an iterable (list, tuple or generator) of
`etree.Element` in which case it will be looped over.
namespace : str
Namespace associated with this WFS featuretype.
Yields
------
An instance of this type for each record in the WFS response.
"""
if isinstance(response, str):
response = response.encode('utf-8')
if isinstance(response, bytes):
response = etree.fromstring(response)
element_type = type(etree.Element(b'xml'))
if isinstance(response, element_type):
feature_members = response.findall(
'.//{http://www.opengis.net/wfs/2.0}member')
if feature_members is not None:
for member in feature_members:
feature = member[0]
yield (cls.from_wfs_element(feature, namespace))
if type(response) in (list, tuple, set) \
or isinstance(response, types.GeneratorType):
for el in response:
yield (cls.from_wfs_element(el, namespace))
[docs]
@classmethod
def get_field_names(cls, return_fields=None, include_subtypes=True,
include_wfs_injected=False, include_geometry=False):
"""Return the names of the fields available for this type.
Parameters
----------
return_fields : ReturnFieldList
List of fields to include in the data array. The order is
ignored, the default order of the fields of the datatype is used
instead. Defaults to None, which will include all fields.
include_subtypes : boolean
Whether to include fields defined in subtypes (True) or not (
False). Defaults to True.
include_wfs_injected : boolean
Whether to include fields defined in WFS only, not in the
default dataframe for this type. Defaults to False.
include_geometry : boolean
Whether to include geometry fields. Defaults to False.
Returns
-------
list<str>
List of the field names available for this type. These are also
the names of the columns in the output dataframe for this type.
Raises
------
AttributeError
If the type of `return_fields` is not one of None, list, tuple or
set.
pydov.util.errors.InvalidFieldError
If at least one of the fields listed in `return_fields` is unknown.
"""
if return_fields is None:
if include_wfs_injected:
fields = [f['name'] for f in cls.fields if f['type']
!= 'geometry' or include_geometry]
else:
fields = [f['name'] for f in cls.fields if not f.get(
'wfs_injected', False) and (
f['type'] != 'geometry' or include_geometry)]
if include_subtypes:
for st in cls.subtypes:
fields.extend(st.get_field_names())
elif not isinstance(return_fields, ReturnFieldList):
raise AttributeError(
'return_fields should be an instance of '
'pydov.types.fields.ReturnFieldList')
else:
cls_fields = [f['name'] for f in cls.fields if f['type']
!= 'geometry' or include_geometry]
if include_subtypes:
for st in cls.subtypes:
cls_fields.extend(st.get_field_names())
fields = [f.name for f in return_fields if f.name in cls_fields]
for rf in return_fields:
if rf.name not in cls_fields:
raise InvalidFieldError(
"Unknown return field: '{}'".format(rf.name))
return fields
[docs]
@classmethod
def get_fields(cls, source=('wfs', 'xml'), include_subtypes=True):
"""Return the metadata of the fields available for this type.
Parameters
----------
source : list<str> or tuple<str> or iterable<str>
A list of sources to include in the output. Can be a combination
of one or more of `wfs`, `xml` or `custom . Defaults to (`wfs`,
`xml`).
include_subtypes : boolean
Whether to include fields defined in subtypes (True) or not (
False). Defaults to True.
Returns
-------
collections.OrderedDict<str,dict>
Ordered dictionary mapping the field (column) name to the
dictionary containing the metadata of this field.
This metadata dictionary includes at least:
name (str)
The name of the field in the output data.
source (str)
The source of the field (either `wfs`, `xml` or `custom`).
type (str)
Datatype of the output data field (one of `string`, `float`,
`integer`, `date`, `datetime`, `boolean`).
The metadata dictionary additionally includes for fields with
source `xml` or `wfs`:
sourcefield (str)
The name of the field in the source (source + sourcefield
identify the origin of the data).
The metadata dictionary additionally includes for fields with
source `xml` or `custom`:
definition (str)
The definition of the field.
notnull (boolean)
Whether the field is mandatory (True) or can be null (False).
"""
fields = OrderedDict(
zip([f['name'] for f in cls.fields if f['source'] in source],
[f for f in cls.fields if f['source'] in source]))
if include_subtypes and 'xml' in source:
for st in cls.subtypes:
fields.update(st.get_fields())
return fields
[docs]
@classmethod
def get_xsd_schemas(cls):
"""Get a set of distinct XSD schema URLs for this type and its
subtypes.
Returns
-------
set of str
A set of XSD schema URLs.
"""
xsd_schemas = set()
fields = cls.get_fields(source='xml', include_subtypes=True)
for f in fields.values():
if 'xsd_type' in f:
xsd_schemas.add(f['xsd_schema'])
return xsd_schemas
[docs]
@classmethod
def to_df_array(cls, iterable, return_fields=None):
"""Returns a dataframe array with one or more arrays (rows) for each
instance in the given iterable.
Uses parallel processing to speed up IO operations.
Parameters
----------
iterable : list<DovType> or tuple<DovType> or iterable<DovType>
A list of instances of a DOV type.
return_fields : list<str> or tuple<str> or set<str> or iterable<str>
List of fields to include in the data array. The order is
ignored, the default order of the fields of the datatype is used
instead. Defaults to None, which will include all fields.
Returns
-------
list of list
Dataframe contents in the format of a twodimensional list (rows)
of lists (columns). The values in the second list are in the
same order as the field/column names, for inclusion in the
resulting Pandas dataframe of a search operation.
"""
def unnest_result(result, df_result):
"""Unnest the result into multiple rows (lists) if necessary. Rows
are appended to the df_result list."""
if result is not None and len(result) > 0:
if isinstance(result[0], list):
for r in result:
df_result.append(r)
else:
df_result.append(result)
pool = LocalSessionThreadPool()
for item in iterable:
pool.execute(item.get_df_array, (return_fields,))
df_result = []
for res in pool.join():
unnest_result(res.get_result(), df_result)
return df_result
def _get_xml_data(self, session=None):
"""Return the raw XML data for this DOV object.
Parameters
----------
session : requests.Session
Session to use to perform HTTP requests for data. Defaults to None,
which means a new session will be created for each request.
Returns
-------
xml : bytes
The raw XML data of this DOV object as bytes.
"""
if pydov.cache:
return pydov.cache.get(self.pkey + '.xml', session)
else:
xml = get_dov_xml(self.pkey + '.xml', session)
HookRunner.execute_xml_downloaded(self.pkey)
return xml
def _parse_subtypes(self, xml):
"""Parse the subtypes with the given XML data.
Parameters
----------
xml : bytes
The raw XML data of the DOV object as bytes.
"""
for subtype in self.subtypes:
st_name = subtype.get_name()
if st_name not in self.subdata:
self.subdata[st_name] = []
for subitem in subtype.from_xml(xml):
self.subdata[st_name].append(subitem)
[docs]
def get_df_array(self, return_fields=None, session=None):
"""Return the data array of the instance of this type for inclusion
in the resulting output dataframe of a search operation.
Parameters
----------
return_fields : list<str> or tuple<str> or set<str> or iterable<str>
List of fields to include in the data array. The order is
ignored, the default order of the fields of the datatype is used
instead. Defaults to None, which will include all fields.
session : requests.Session
Session to use to perform HTTP requests for data. Defaults to None,
which means a new session will be created for each request.
Returns
-------
list
List of the values of this instance in the same order as the
field/column names, for inclusion in the result dataframe of a
search operation.
"""
fields = self.get_field_names(return_fields, include_geometry=True)
if len(fields) == 0:
fields = self.get_field_names(
return_fields, include_wfs_injected=True,
include_geometry=False)
ownfields = self.get_field_names(include_subtypes=False,
include_wfs_injected=True,
include_geometry=True)
subfields = [f for f in fields if f not in ownfields]
parsed = None
if len(subfields) > 0:
parsed = self._parse_xml_data(session)
datadicts = []
datarecords = []
if len(self.subdata) == 0 or len(subfields) == 0:
datadicts.append(self.data)
else:
for subtype in self.subdata:
if len(self.subdata[subtype]) == 0:
datadicts.append(self.data)
else:
for subdata in self.subdata[subtype]:
datadict = {}
datadict.update(self.data)
datadict.update(subdata.data)
datadicts.append(datadict)
for d in datadicts:
datarecords.append([d.get(field, np.nan) for field in fields])
for d in datarecords:
if parsed is None and self._UNRESOLVED in d:
parsed = self._parse_xml_data(session)
if parsed is True:
datarecords = self.get_df_array(return_fields)
return [[c if c != self._UNRESOLVED else np.nan for c in r]
for r in datarecords]