Source code for pydov.util.location

# -*- coding: utf-8 -*-
"""Module grouping classes for location based filters used for searching.

This module is designed to comply with the WFS 2.0.0 standard, implying
Filter Encoding 2.0 and GML 3.2.

"""
import os
from collections import OrderedDict
from packaging.version import Version
from io import BytesIO
import random
import string

from owslib.etree import etree
from owslib.fes2 import Or


[docs] class EpsgValidator(object): """Class grouping methods for enforcing and validating EPSG parameters.""" def _is_existing_epsg(self, epsg): """Check whether the provided EPSG code is a valid EPSG code according to pyproj. Pass if pyproj is not installed. Parameters ---------- epsg : int The EPSG code to check. Raises ------ ValueError When the given EPSG is invalid according to pyproj. """ try: from pyproj import CRS from pyproj.exceptions import CRSError CRS.from_epsg(epsg) except ImportError: pass except CRSError: raise ValueError def _validate_epsg(self, epsg): """ Validate the provided EPSG code, raising an exception if the EPSG code is either missing or invalid. Parameters ---------- epsg : int or None The EPSG code to validate. Raises ------ TypeError If `epsg` is None not an integer, with a message explaining how to provide a valid code. ValueError If `epsg` is invalid, with a message explaining how to provide a valid code. Notes ----- https://epsg.io for a list of valid EPSG codes. """ generic_error = (f"Example usage: {self.__class__.__name__}" "(..., epsg=3812)\n" "Useful EPSG codes:\n" " - 31370: Belgian Lambert 72\n" " - 3812: Belgian Lambert 2008\n" " - 4326: WGS84 (GPS lon/lat)\n" "Refer to https://epsg.io for other valid codes.") if epsg is None: raise TypeError("Missing required parameter 'epsg'.\n" + generic_error) if not isinstance(epsg, int): raise TypeError("EPSG code must be an integer, " f"got {type(epsg).__name__}.\n" + generic_error) try: self._is_existing_epsg(epsg) except ValueError: raise ValueError(f"Invalid EPSG code: {epsg}.\n" + generic_error) \ from None
[docs] class AbstractLocation(EpsgValidator): """Abstract base class for location types (f.ex. point, box, polygon). Locations are GML elements, for inclusion in the WFS GetFeature request. As described in the Filter Encoding 2.0 standard, locations are expressed using GML 3.2. The initialisation should require all necessary parameters to construct a valid location of this type: i.e. all locations should be valid after initialisation. """ def _get_id_seed(self): """Get the seed for generating a random but stable GML ID for this location. Should return the same value for locations considered equal. """ raise NotImplementedError('This should be implemented in a subclass.') def _get_id(self): random.seed(self._get_id_seed()) random_id = ''.join(random.choice( string.ascii_letters + string.digits) for x in range(8)) return f'pydov.{random_id}'
[docs] def get_element(self): """Return the GML representation of the location. Returns ------- etree.Element XML element of the GML representation of this location. """ raise NotImplementedError('This should be implemented in a subclass.')
[docs] class AbstractLocationFilter(object): """Abstract base class for location filters (f.ex. within, dwithin). Location filters are ogc:SpatialOpsType elements, for inclusion in the WFS GetFeature request. The initialisation should accept at least a (subclass of) AbstractLocation and require all additional parameters that are required for a valid location filter of this type. One exception is the name of the geometry column to query: this is set automatically upon building the WFS GetFeature request using the `set_geometry_column` method. """
[docs] def set_geometry_column(self, geometry_column): """Set the name of the geometry column to query. Parameters ---------- geometry_column : str The name of the geometry column to query. """ raise NotImplementedError('This should be implemented in a subclass.')
[docs] def toXML(self): """Return the XML representation of the location filter. Should raise a RuntimeError when called before the geometry column is set through `set_geometry_column`: location filters without the geometry column name are invalid. Returns ------- etree.Element XML element of this location filter. Raises ------ RuntimeError When called before the geometry column name is set: location filters without the geometry column name are invalid. """ raise NotImplementedError('This should be implemented in a subclass.')
[docs] class AbstractBinarySpatialFilter(AbstractLocationFilter): """Class representing a binary spatial filter. Binary spatial filters are ogc:BinarySpatialOpType elements, i.e. one of Equals, Disjoint, Touches, Within, Overlaps, Crosses, Intersects or Contains. """ def __init__(self, type, location): """Initialise a Binary spatial filter. Parameters ---------- type : str Type of this filter: one of Equals, Disjoint, Touches, Within, Overlaps, Crosses, Intersects or Contains. location : AbstractLocation An instance of a location to use as location for the spatial filter. """ self.location = location self.geom_column = '' self.element = etree.Element( '{{http://www.opengis.net/fes/2.0}}{}'.format(type)) geom = etree.Element('{http://www.opengis.net/fes/2.0}ValueReference') geom.text = self.geom_column self.element.append(geom) self.element.append(location.get_element())
[docs] def set_geometry_column(self, geometry_column): self.geom_column = geometry_column geom = self.element.find( './/{http://www.opengis.net/fes/2.0}ValueReference') geom.text = geometry_column
[docs] def toXML(self): if self.geom_column == '': raise RuntimeError('Geometry column has not been set. Use ' '"set_geometry_column" to set it.') return self.element
[docs] class Box(AbstractLocation): """Class representing a box location, also known as bounding box, envelope, extent.""" def __init__(self, minx, miny, maxx, maxy, epsg=None): """Initialise a Box. To initialise a Box using GPS coordinates in decimal degrees, use epsg=4326 and enter the longitude range as `minx` and `maxx` and the latitude range as `miny` and `maxy`. Parameters ---------- minx : float X coordinate of the lower left corner of the box. miny : float Y coordinate of the lower left corner of the box. maxx : float X coordinate of the upper right corner of the box. maxy : float Y coordinate of the upper right corner of the box. epsg : int EPSG code of the coordinate reference system (CRS) of the coordinates specified in `minx`, `miny`, `maxx`, `maxy`. Raises ------ TypeError If `epsg` is None, missing or not an integer. ValueError If `maxx` is lower than or equal to `minx`. If `maxy` is lower than or equal to `miny`. If `epsg` is invalid Notes ----- See https://epsg.io for a list of valid EPSG codes. """ self._validate_epsg(epsg) if maxx <= minx: raise ValueError("MaxX should be greater than MinX.") if maxy <= miny: raise ValueError("MaxY should be greater than MinY.") self.minx = minx self.miny = miny self.maxx = maxx self.maxy = maxy self.epsg = epsg self.element = etree.Element( '{http://www.opengis.net/gml/3.2}Envelope') self.element.set('srsDimension', '2') self.element.set( 'srsName', 'http://www.opengis.net/gml/srs/epsg.xml#{:d}'.format(epsg)) self.element.set('{http://www.opengis.net/gml/3.2}id', self._get_id()) lower_corner = etree.Element( '{http://www.opengis.net/gml/3.2}lowerCorner') lower_corner.text = '{:.06f} {:.06f}'.format(self.minx, self.miny) self.element.append(lower_corner) upper_corner = etree.Element( '{http://www.opengis.net/gml/3.2}upperCorner') upper_corner.text = '{:.06f} {:.06f}'.format(self.maxx, self.maxy) self.element.append(upper_corner) def _get_id_seed(self): return ','.join(str(i) for i in [ self.minx, self.miny, self.maxx, self.miny, self.epsg ])
[docs] def get_element(self): return self.element
[docs] class Point(AbstractLocation): """Class representing a point location.""" def __init__(self, x, y, epsg=None): """Initialise a Point. To initialise a Point using GPS coordinates in decimal degrees, use epsg=4326 and enter the longitude as `x` and the latitude as `y`. Parameters ---------- x : float X coordinate (or longitude) of the point. y : float Y coordinate (or latitude) of the point. epsg : int EPSG code of the coordinate reference system (CRS) of the coordinates specified in `x`, `y`. Raises ------ TypeError If `epsg` is None, missing or not an integer. ValueError If `epsg` invalid. Notes ----- See https://epsg.io for a list of valid EPSG codes. """ self._validate_epsg(epsg) self.x = x self.y = y self.epsg = epsg self.element = etree.Element('{http://www.opengis.net/gml/3.2}Point') self.element.set('srsDimension', '2') self.element.set( 'srsName', 'http://www.opengis.net/gml/srs/epsg.xml#{:d}'.format(epsg)) self.element.set('{http://www.opengis.net/gml/3.2}id', self._get_id()) coordinates = etree.Element('{http://www.opengis.net/gml/3.2}pos') coordinates.text = '{:.06f} {:.06f}'.format(self.x, self.y) self.element.append(coordinates) def _get_id_seed(self): return ','.join(str(i) for i in [ self.x, self.y, self.epsg ])
[docs] def get_element(self): return self.element
[docs] class GmlObject(AbstractLocation): """Class representing a raw GML3.2 location, f.ex. gml32:Surface or gml32:MultiSurface.""" def __init__(self, gml_element): """Initialise a GmlObject. Initialise a GmlObject from an existing XML element representing a GML3.2 location. Parameters ---------- gml_element : etree.Element or str or bytes XML element of the GML3.2 location, either as etree.Element, bytes or string representation. """ if isinstance(gml_element, str): self.element = etree.fromstring(gml_element.encode('utf8')) elif isinstance(gml_element, bytes): self.element = etree.fromstring(gml_element) else: self.element = gml_element if self.element.find('.//{http://www.opengis.net/gml/3.2}*') is None: if self.element.find( './/{http://www.opengis.net/gml}*') is not None: raise ValueError( 'GML element should be GML version 3.2, it appears to be ' 'older.') raise ValueError('GML element appears not to be valid GML3.2') if not self.element.attrib.get("srsName"): raise ValueError("GML element is missing the attribute srsName") try: epsg = int(self.element.attrib.get("srsName").split(':')[-1]) self._is_existing_epsg(epsg) except ValueError: raise ValueError("GML element has an invalid attribute srsName")
[docs] def get_element(self): return self.element
[docs] class Equals(AbstractBinarySpatialFilter): """Class representing a spatial Equals filter. A spatial Equals will return all points that are equal to another point. """ def __init__(self, location): """Initialise an Equals filter. Parameters ---------- location : AbstractLocation An instance of a location to use as location for the Equals filter. """ super(Equals, self).__init__('Equals', location)
[docs] class Disjoint(AbstractBinarySpatialFilter): """Class representing a spatial Disjoint filter. A spatial Disjoint will return all points that are disjoint from a polygon or box location: i.e. that are not inside nor on the boundary. A spatial Disjoint is the inverse of a spatial Intersects. """ def __init__(self, location): """Initialise a Disjoint filter. Parameters ---------- location : AbstractLocation An instance of a location to use as location for the Disjoint filter. """ super(Disjoint, self).__init__('Disjoint', location)
[docs] class Touches(AbstractBinarySpatialFilter): """Class representing a spatial Touches filter. A spatial Touches will return all points that touch a polygon or box location: i.e. that are on the boundary but not inside. """ def __init__(self, location): """Initialise a Touches filter. Parameters ---------- location : AbstractLocation An instance of a location to use as location for the Touches filter. """ super(Touches, self).__init__('Touches', location)
[docs] class Within(AbstractBinarySpatialFilter): """Class representing a spatial Within filter. A spatial Within will return all points that are entirely within a polygon or box location (i.e. are not on the boundary). """ def __init__(self, location): """Initialise a Within filter. Parameters ---------- location : AbstractLocation An instance of a location to use as location for the Within filter. """ super(Within, self).__init__('Within', location)
[docs] class Intersects(AbstractBinarySpatialFilter): """Class representing a spatial Intersects filter. A spatial Intersects will return all points that are within or on the boundary of a polygon or box location. A spatial Intersects is the inverse of a spatial Disjoint. """ def __init__(self, location): """Initialise an Intersects filter. Parameters ---------- location : AbstractLocation An instance of a location to use as location for the Intersects filter. """ super(Intersects, self).__init__('Intersects', location)
[docs] class WithinDistance(AbstractLocationFilter): """Class representing a spatial DWithin filter. A spatial DWithin will return all points that are within a given distance of a certain location. """ def __init__(self, location, distance, distance_unit='meter'): """Initialise a WithinDistance filter. Parameters ---------- location : AbstractLocation An instance of a location to use as location for the WithinDistance filter. distance : float Amount of distance units to use for the filter. distance_unit : string, optional, defaults to 'meter' The distance unit of the value of `distance`. """ self.location = location self.distance = distance self.distance_unit = distance_unit self.geom_column = '' self.element = etree.Element('{http://www.opengis.net/fes/2.0}DWithin') geom = etree.Element('{http://www.opengis.net/fes/2.0}ValueReference') geom.text = self.geom_column distance = etree.Element('{http://www.opengis.net/fes/2.0}Distance') distance.set('units', self.distance_unit) distance.text = '{:.06f}'.format(self.distance) self.element.append(geom) self.element.append(location.get_element()) self.element.append(distance)
[docs] def set_geometry_column(self, geometry_column): self.geom_column = geometry_column geom = self.element.find( './/{http://www.opengis.net/fes/2.0}ValueReference') geom.text = geometry_column
[docs] def toXML(self): if self.geom_column == '': raise RuntimeError('Geometry column has not been set. Use ' '"set_geometry_column" to set it.') return self.element
[docs] class GmlFilter(AbstractLocationFilter): """Class for construction a spatial filter expression from a GML3.2 document. """ def __init__(self, gml, location_filter, location_filter_kwargs=None, combinator=Or): """Initialise a spatial filter expression from a GML3.2 string. Parameters ---------- gml : str Either a string representation of the GML document to parse, or a path to a GML3.2 file on disk. location_filter : class<AbstractLocationFilter> Location filter to use for the geometries in the GML document. location_filter_kwargs : dict, optional Keyword-based arguments to pass to the `location_filter` on initialisation (with the exception of the `location` parameter, which is automatically parsed from the GML). Can be skipped in cases where the location_filter takes no extra arguments besides location. combinator : class<BinaryLogicOpType>, optional, defaults to Or One of (Or, And) used to combine filters for different geometries in the GML document. Raises ------ ValueError When no geometries could be parsed from the given GML record. """ self.gml = gml self.subelements = OrderedDict() if location_filter_kwargs is None: location_filter_kwargs = {} self._parse_gml() if len(self.subelements) == 1: self.element = location_filter( GmlObject(list(self.subelements)[0]), **location_filter_kwargs) else: self.element = combinator( [location_filter(GmlObject(e), **location_filter_kwargs) for e in self.subelements]) def _dedup_multi(self, tree, xpath_single, xpath_multi): """Parse single and multi* geometries from the same type from the GML tree. Deduplicates in the sense that single geometries inside a multi* geometry will be discarded in favor of the latter. Parameters ---------- tree : etree.ElementTree XML tree of the GML to parse. xpath_single : str XPath of the single geometries to parse. xpath_multi : str XPath of the multi* geometries to parse. Returns ------- single, multi : set of etree.Element, set of etree.Element Sets of the parsed single and multi geometry Elements. """ single = OrderedDict.fromkeys(tree.findall(xpath_single)) multi = OrderedDict.fromkeys(tree.findall(xpath_multi)) for m in multi: s_in_m = OrderedDict.fromkeys(m.findall(xpath_single)) single = OrderedDict.fromkeys(e for e in single if e not in s_in_m) return single, multi def _parse_gml(self): """Checks the gml parameter and tries parsing the file. Raises ------ ValueError When the file could not be parsed. """ gml_tree = None try: if isinstance(self.gml, str): gml_tree = etree.fromstring(self.gml.encode('utf8')) elif isinstance(self.gml, bytes): gml_tree = etree.fromstring(self.gml) except etree.ParseError as error: if os.path.isfile(self.gml): with open(self.gml, 'r') as gml_file: gml_tree = etree.fromstring(gml_file.read().encode('utf8')) else: raise error finally: if gml_tree is not None: self._parse_gml_tree(gml_tree) else: raise ValueError('Failed to parse GML3.2 file.') def _parse_gml_tree(self, gml_tree): """Parse the GML tree and add subelements for geometries. Parameters ---------- gml_tree : etree.ElementTree XML tree of the GML to parse. Raises ------ ValueError When no geometries could be parsed from the given GML record. """ points, multipoints = self._dedup_multi( gml_tree, './/{http://www.opengis.net/gml/3.2}Point', './/{http://www.opengis.net/gml/3.2}MultiPoint' ) self.subelements.update(points) self.subelements.update(multipoints) linestrings, multicurves = self._dedup_multi( gml_tree, './/{http://www.opengis.net/gml/3.2}LineString', './/{http://www.opengis.net/gml/3.2}MultiCurve' ) self.subelements.update(linestrings) self.subelements.update(multicurves) polygons, multisurfaces = self._dedup_multi( gml_tree, './/{http://www.opengis.net/gml/3.2}Polygon', './/{http://www.opengis.net/gml/3.2}MultiSurface') self.subelements.update(polygons) self.subelements.update(multisurfaces) if len(self.subelements) == 0: if gml_tree.find('.//{http://www.opengis.net/gml}*') is not None: raise ValueError( 'GML document should be GML version 3.2, it appears to be ' 'older.') raise ValueError('Failed to extract geometries from GML3.2 ' 'document, is this a valid GML3.2 document?')
[docs] def set_geometry_column(self, geometry_column): if len(self.subelements) == 1: self.element.set_geometry_column(geometry_column) else: for sub_element in self.element.operations: sub_element.set_geometry_column(geometry_column)
[docs] def toXML(self): return self.element.toXML()
[docs] class GeometryFilter(GmlFilter): """Class for construction a spatial filter expression from any Geometry Fiona can convert to a GML 3.2 document. This class requires the ``fiona`` package to be installed. """ def __init__(self, geometry, location_filter, location_filter_kwargs=None, combinator=Or): """Initialise a spatial filter expression from a given Geometry. Parameters ---------- geometry : str A path to a vector file on disk. location_filter : class<AbstractLocationFilter> Location filter to use for the geometries in the GML document. location_filter_kwargs : dict, optional Keyword-based arguments to pass to the `location_filter` on initialisation (with the exception of the `location` parameter, which is automatically parsed from the geomery). Can be skipped in cases where the location_filter takes no extra arguments besides location. combinator : class<BinaryLogicOpType>, optional, defaults to Or One of (Or, And) used to combine filters for different geometries in the geometry document. Raises ------ ValueError When no geometries could be parsed from the given geometry. """ gml = self._parse_geometry(geometry) super().__init__(gml, location_filter, location_filter_kwargs, combinator) @staticmethod def _parse_geometry(geometry): """Checks the geometry and parse the file if possible. Raises ------ ValueError When the file could not be parsed. """ try: import fiona # Check if the GML driver and write mode are supported and check # if version of Fiona supports GML 3.2 output (Fiona >= 1.8.18) drivers = fiona.supported_drivers if 'w' not in drivers.get('GML', '') or \ Version(fiona.__version__) < Version('1.8.18'): raise UserWarning('Please use module Fiona version 1.8.18 ' 'or higher') except ImportError: raise ImportError('Failed to import fiona. GeometryFilter ' 'requires fiona to be installed.') else: # Open the geometry as a collection containing multiple records with fiona.open(geometry, 'r') as c: gml_blob = BytesIO() # Assume the geometry is loaded from file layer = os.path.basename(os.path.splitext(geometry)[0]) with fiona.open(gml_blob, 'w', layer=layer, driver='GML', schema=c.schema, crs=c.crs, FORMAT='GML3.2') as gml: for r in c: gml.write(r) gml_blob.seek(0) # Set the gml attribute used by the parent class gml = gml_blob.read() return gml
[docs] class GeopandasFilter(GmlFilter): """Class for construction a spatial filter expression from a GeoPandas object. This class requires the ``geopandas`` package to be installed. """ def __init__(self, geodataframe, location_filter, location_filter_kwargs=None, combinator=Or): """Initialise a spatial filter expression from a GeoPandas GeoDataFrame. Parameters ---------- geodataframe : GeoPandas.GeoDataFrame A GeoDataFrame with a valid crs. location_filter : class<AbstractLocationFilter> Location filter to use for the geometries in the GML document. location_filter_kwargs : dict, optional Keyword-based arguments to pass to the `location_filter` on initialisation (with the exception of the `location` parameter, which is automatically parsed from the geomery). Can be skipped in cases where the location_filter takes no extra arguments besides location. combinator : class<BinaryLogicOpType>, optional, defaults to Or One of (Or, And) used to combine filters for different geometries in the geometry document. Raises ------ ValueError When no geometries could be parsed from the given geometry. """ gml = self._parse_geometry(geodataframe) super().__init__(gml, location_filter, location_filter_kwargs, combinator) @staticmethod def _parse_geometry(gdf): """Checks the geometry and parse the file if possible. Raises ------ ValueError When the file could not be parsed. """ try: import geopandas except ImportError: raise ImportError('Failed to import geopandas. GeopandasFilter ' 'requires geopandas to be installed.') else: if not isinstance(gdf, geopandas.GeoDataFrame): raise TypeError("pydov only supports geopandas.GeoDataFrame " "to define a spatial query.") if not gdf.crs: raise AttributeError("geopandas.GeoDataFrame is missing a " "CRS definition") with BytesIO() as gml_blob: gdf.to_file(gml_blob, driver="GML", FORMAT="GML3.2", layer="geodataframe") gml_blob.seek(0) gml = gml_blob.read() return gml