import json import math from abc import abstractmethod from typing import Optional, Tuple import fiona.crs # pip install Fiona import fiona.transform import owslib.crs import owslib.wms import rasterio import rasterio.sample from rasterio import DatasetReader from wrpylib.wrvalidators import LonLat LON_LAT_CRS = fiona.crs.from_epsg(4326) XY = Tuple[float, float] def get_ele_from_raster(dataset: DatasetReader, x_y: XY, band: int = 1, crs: str = None) -> Optional[float]: """ Examples of filenames: * '/path/to/file.tif' * 'file:///path/to/file.tif' * 'zip:///path/to/file.zip!/folder/file.tif' * 'zip+file:///path/to/file.zip!/folder/file.tif' Usage: >>> lon_lat = LonLat(11.2, 47.8) >>> filename = '/path/to/file.tif' >>> with rasterio.open(filename) as dataset: >>> crs = dataset.crs.to_proj4() >>> x_y = transform_lon_lat(lon_lat, crs) >>> ele = get_ele_from_raster(dataset, x_y) """ ele = list(rasterio.sample.sample_gen(dataset, [x_y], band))[0] return None if ele == dataset.nodata else float(ele) def get_bbox_from_wms(wms: owslib.wms.WebMapService, layer_name: str) -> Tuple[Tuple[float, float, float, float], str]: """Returns the bounding box in dataset CRS.""" layer = wms.contents[layer_name] bbox_crs = layer.boundingBox return bbox_crs[:4], bbox_crs[-1] def get_ele_from_wms(lon_lat: LonLat, wms: owslib.wms.WebMapService, layer_name: str) -> Optional[float]: nan_value = -9999 bbox_width_px = 3 bbox_height_px = 3 bbox_width_m = 5. bbox_height_m = bbox_width_m left_bottom = _shift_lon_lat(lon_lat, -bbox_height_m / 2., -bbox_height_m / 2.) right_top = _shift_lon_lat(lon_lat, bbox_height_m / 2., bbox_height_m / 2.) response = wms.getfeatureinfo( layers=[layer_name], srs='CRS:84', bbox=(left_bottom.lon, left_bottom.lat, right_top.lon, right_top.lat), size=(bbox_width_px, bbox_height_px), info_format='application/json', xy=(bbox_width_px // 2, bbox_height_px // 2), ) info_json_str = response.read().decode() info_json = json.loads(info_json_str) assert info_json['type'] == 'FeatureCollection' features = info_json['features'] assert len(features) <= 1 if len(features) == 0: return None feature = features[0] assert feature['type'] == 'Feature' ele = feature['properties']['GRAY_INDEX'] if ele == nan_value: return None return ele def transform_lon_lat(lon_lat: LonLat, crs: str) -> XY: """Transform the coordinates to dataset CRS.""" xs, ys = fiona.transform.transform(LON_LAT_CRS, crs, [lon_lat.lon], [lon_lat.lat]) return [(x, y) for x, y in zip(xs, ys)][0] def _shift_lon_lat(orig: LonLat, west_m: float, north_m: float) -> LonLat: """Shifts a given geographic coordinate by a given metric amount""" meter_per_degree = 10_000_000 / 90 # distance equator to pole is 10^7 m return LonLat(orig.lon + (west_m / meter_per_degree) * math.cos(orig.lat * math.pi / 180.), orig.lat + north_m / meter_per_degree) def _is_in_bbox(x_y: Tuple[float, float], bbox: Tuple[float, float, float, float]) -> bool: x0, y0, x1, y1 = bbox return x0 <= x_y[0] <= x1 and y0 <= x_y[1] <= y1 class DemBase: def close(self): pass def get_name(self) -> str: return self.__class__.__name__ @abstractmethod def get_ele(self, lon_lat: LonLat) -> Optional[float]: return None @abstractmethod def is_in_bbox(self, lon_lat: LonLat) -> bool: pass class DemRasterBase(DemBase): def __init__(self, filename: str, band: int = 1, crs: str = None): self.filename = filename self.band = band self.dataset = rasterio.open(filename) self.crs = crs if crs else self.dataset.crs.to_proj4() def close(self): self.dataset.close() def get_ele(self, lon_lat: LonLat) -> Optional[float]: x_y = transform_lon_lat(lon_lat, self.crs) return get_ele_from_raster(self.dataset, x_y, self.band, self.crs) def is_in_bbox(self, lon_lat: LonLat) -> bool: bbox = self.dataset.bounds x_y = transform_lon_lat(lon_lat, self.crs) return _is_in_bbox(x_y, bbox) class DemBasemap(DemRasterBase): def __init__(self): filename = '/home/philipp/daten/GeoData/dem/oesterreich_10m/dhm_lamb_10m.tif' super().__init__(filename) class DemSwitzerland(DemRasterBase): def __init__(self): filename = 'zip:///home/philipp/daten/GeoData/dem/schweiz_200m/dem_schweiz_200m.zip!data/DHM200.asc' super().__init__(filename, crs='EPSG:21781') class DemBavaria(DemRasterBase): def __init__(self): filename = 'zip:///home/philipp/daten/GeoData/dem/bayern_50m/dgm50_epsg25832.tif.zip!dgm50_epsg25832.tif' super().__init__(filename) class DemWmsBase(DemBase): def __init__(self, url: str, wms_version: str, layer_name: str): self.url = url self.wms_version = wms_version self.layer_name = layer_name self.wms = None def close(self): pass def get_wms(self) -> owslib.wms.WebMapService: if self.wms is None: self.wms = owslib.wms.WebMapService(self.url, version=self.wms_version) return self.wms def get_ele(self, lon_lat: LonLat) -> Optional[float]: wms = self.get_wms() return get_ele_from_wms(lon_lat, wms, self.layer_name) def is_in_bbox(self, lon_lat: LonLat) -> bool: bbox, crs = get_bbox_from_wms(self.get_wms(), self.layer_name) x_y = transform_lon_lat(lon_lat, crs) return _is_in_bbox(x_y, bbox) class DemSouthTyrol(DemWmsBase): def __init__(self): url = 'http://geoservices.buergernetz.bz.it/geoserver/p_bz-elevation/ows?' \ 'SERVICE=WMS&request=getCapabilities' super().__init__(url, '1.3.0', 'DTM-2p5m') class MultiDem: def __init__(self): self.dems = [ DemBasemap(), DemSouthTyrol(), # DemBavaria(), # too coarse - we should have a 25 m grid or denser - this is 50 m # DemSwitzerland(), # too coarse - we should have a 25 m grid or denser - this is 200 m ] def close(self): for dem in self.dems: dem.close() def get_ele(self, lon_lat: LonLat) -> Tuple[Optional[float], Optional[str]]: for dem in self.dems: if dem.is_in_bbox(lon_lat): ele = dem.get_ele(lon_lat) if ele is not None: return ele, dem.get_name() return None, None