+import json
import math
+from abc import abstractmethod
+from typing import Optional, Tuple
-import fiona.crs
+import fiona.crs # pip install Fiona
import fiona.transform
+import owslib.crs
+import owslib.wms
import rasterio
import rasterio.sample
+from rasterio import DatasetReader
from wrpylib.wrvalidators import LonLat
LON_LAT_CRS = fiona.crs.from_epsg(4326)
-def get_ele_from_raster(filename: str, lon_lat: LonLat, band: int = 1) -> float:
+XY = Tuple[float, float]
+
+
+def get_ele_from_raster(dataset: DatasetReader, x_y: XY, band: int = 1, crs: str = None) -> Optional[float]:
"""
Examples of filenames:
* '/path/to/file.tif'
* 'file:///path/to/file.tif'
* 'zip:///path/to/file.zip!/folder/file.tif'
* 'zip+file:///path/to/file.zip!/folder/file.tif'
+
+ Usage:
+ >>> lon_lat = LonLat(11.2, 47.8)
+ >>> filename = '/path/to/file.tif'
+ >>> with rasterio.open(filename) as dataset:
+ >>> crs = dataset.crs.to_proj4()
+ >>> x_y = transform_lon_lat(lon_lat, crs)
+ >>> ele = get_ele_from_raster(dataset, x_y)
"""
- with rasterio.open(filename) as dataset:
- xs, ys = fiona.transform.transform(LON_LAT_CRS, dataset.crs.to_proj4(), [lon_lat.lon], [lon_lat.lat])
- xy_list = [(x, y) for x, y in zip(xs, ys)]
- ele = list(rasterio.sample.sample_gen(dataset, xy_list, band))
- return None if ele is None else float(ele[0])
+ ele = list(rasterio.sample.sample_gen(dataset, [x_y], band))[0]
+ return None if ele == dataset.nodata else float(ele)
+
+
+def get_bbox_from_wms(wms: owslib.wms.WebMapService, layer_name: str) -> Tuple[Tuple[float, float, float, float], str]:
+ """Returns the bounding box in dataset CRS."""
+ layer = wms.contents[layer_name]
+ bbox_crs = layer.boundingBox
+ return bbox_crs[:4], bbox_crs[-1]
+
+
+def get_ele_from_wms(lon_lat: LonLat, wms: owslib.wms.WebMapService, layer_name: str) -> Optional[float]:
+ nan_value = -9999
+ bbox_width_px = 3
+ bbox_height_px = 3
+ bbox_width_m = 5.
+ bbox_height_m = bbox_width_m
+ left_bottom = _shift_lon_lat(lon_lat, -bbox_height_m / 2., -bbox_height_m / 2.)
+ right_top = _shift_lon_lat(lon_lat, bbox_height_m / 2., bbox_height_m / 2.)
+ response = wms.getfeatureinfo(
+ layers=[layer_name],
+ srs='CRS:84',
+ bbox=(left_bottom.lon, left_bottom.lat, right_top.lon, right_top.lat),
+ size=(bbox_width_px, bbox_height_px),
+ info_format='application/json',
+ xy=(bbox_width_px // 2, bbox_height_px // 2),
+ )
+ info_json_str = response.read().decode()
+ info_json = json.loads(info_json_str)
+ assert info_json['type'] == 'FeatureCollection'
+ features = info_json['features']
+ assert len(features) <= 1
+ if len(features) == 0:
+ return None
+ feature = features[0]
+ assert feature['type'] == 'Feature'
+ ele = feature['properties']['GRAY_INDEX']
+ if ele == nan_value:
+ return None
+ return ele
+
+
+def transform_lon_lat(lon_lat: LonLat, crs: str) -> XY:
+ """Transform the coordinates to dataset CRS."""
+ xs, ys = fiona.transform.transform(LON_LAT_CRS, crs, [lon_lat.lon], [lon_lat.lat])
+ return [(x, y) for x, y in zip(xs, ys)][0]
+
+
+def _shift_lon_lat(orig: LonLat, west_m: float, north_m: float) -> LonLat:
+ """Shifts a given geographic coordinate by a given metric amount"""
+ meter_per_degree = 10_000_000 / 90 # distance equator to pole is 10^7 m
+ return LonLat(orig.lon + (west_m / meter_per_degree) * math.cos(orig.lat * math.pi / 180.),
+ orig.lat + north_m / meter_per_degree)
+
+
+def _is_in_bbox(x_y: Tuple[float, float], bbox: Tuple[float, float, float, float]) -> bool:
+ x0, y0, x1, y1 = bbox
+ return x0 <= x_y[0] <= x1 and y0 <= x_y[1] <= y1
class DemBase:
- def get_ele(self, lon_lat: LonLat) -> float:
- return math.nan
+ def close(self):
+ pass
def get_name(self) -> str:
return self.__class__.__name__
+ @abstractmethod
+ def get_ele(self, lon_lat: LonLat) -> Optional[float]:
+ return None
+
+ @abstractmethod
+ def is_in_bbox(self, lon_lat: LonLat) -> bool:
+ pass
+
+
+class DemRasterBase(DemBase):
+ def __init__(self, filename: str, band: int = 1, crs: str = None):
+ self.filename = filename
+ self.band = band
+ self.dataset = rasterio.open(filename)
+ self.crs = crs if crs else self.dataset.crs.to_proj4()
+
+ def close(self):
+ self.dataset.close()
+
+ def get_ele(self, lon_lat: LonLat) -> Optional[float]:
+ x_y = transform_lon_lat(lon_lat, self.crs)
+ return get_ele_from_raster(self.dataset, x_y, self.band, self.crs)
+
+ def is_in_bbox(self, lon_lat: LonLat) -> bool:
+ bbox = self.dataset.bounds
+ x_y = transform_lon_lat(lon_lat, self.crs)
+ return _is_in_bbox(x_y, bbox)
+
-class DemBasemap(DemBase):
+class DemBasemap(DemRasterBase):
def __init__(self):
- self.filename = '/home/philipp/daten/GeoData/dem/oesterreich_10m/dhm_lamb_10m.tif'
- # self.filename = 'zip:///home/philipp/daten/GeoData/dem/oesterreich_10m/ogd-10m-at.zip!dhm_lamb_10m.tif'
+ filename = '/home/philipp/daten/GeoData/dem/oesterreich_10m/dhm_lamb_10m.tif'
+ super().__init__(filename)
+
+
+class DemSwitzerland(DemRasterBase):
+ def __init__(self):
+ filename = 'zip:///home/philipp/daten/GeoData/dem/schweiz_200m/dem_schweiz_200m.zip!data/DHM200.asc'
+ super().__init__(filename, crs='EPSG:21781')
+
+
+class DemBavaria(DemRasterBase):
+ def __init__(self):
+ filename = 'zip:///home/philipp/daten/GeoData/dem/bayern_50m/dgm50_epsg25832.tif.zip!dgm50_epsg25832.tif'
+ super().__init__(filename)
+
+
+class DemWmsBase(DemBase):
+ def __init__(self, url: str, wms_version: str, layer_name: str):
+ self.url = url
+ self.wms_version = wms_version
+ self.layer_name = layer_name
+ self.wms = None
+
+ def close(self):
+ pass
+
+ def get_wms(self) -> owslib.wms.WebMapService:
+ if self.wms is None:
+ self.wms = owslib.wms.WebMapService(self.url, version=self.wms_version)
+ return self.wms
+
+ def get_ele(self, lon_lat: LonLat) -> Optional[float]:
+ wms = self.get_wms()
+ return get_ele_from_wms(lon_lat, wms, self.layer_name)
+
+ def is_in_bbox(self, lon_lat: LonLat) -> bool:
+ bbox, crs = get_bbox_from_wms(self.get_wms(), self.layer_name)
+ x_y = transform_lon_lat(lon_lat, crs)
+ return _is_in_bbox(x_y, bbox)
+
+
+class DemSouthTyrol(DemWmsBase):
+ def __init__(self):
+ url = 'http://geoservices.buergernetz.bz.it/geoserver/p_bz-elevation/ows?' \
+ 'SERVICE=WMS&request=getCapabilities'
+ super().__init__(url, '1.3.0', 'DTM-2p5m')
+
+
+class MultiDem:
+ def __init__(self):
+ self.dems = [
+ DemBasemap(),
+ DemSouthTyrol(),
+ # DemBavaria(), # too coarse - we should have a 25 m grid or denser - this is 50 m
+ # DemSwitzerland(), # too coarse - we should have a 25 m grid or denser - this is 200 m
+ ]
+
+ def close(self):
+ for dem in self.dems:
+ dem.close()
- def get_ele(self, lon_lat: LonLat) -> float:
- return get_ele_from_raster(self.filename, lon_lat)
+ def get_ele(self, lon_lat: LonLat) -> Tuple[Optional[float], Optional[str]]:
+ for dem in self.dems:
+ if dem.is_in_bbox(lon_lat):
+ ele = dem.get_ele(lon_lat)
+ if ele is not None:
+ return ele, dem.get_name()
+ return None, None