Add Bavaria.
[philipp/winterrodeln/wrpylib.git] / wrpylib / wrdem.py
index ead79e1e4c2a65b86914ec8a81bba5957a383078..52b41d7fc54337c40883692a4e9df6d406c39875 100644 (file)
+import json
 import math
-from typing import Optional
+from abc import abstractmethod
+from collections import namedtuple
+from typing import Optional, Tuple
 
 import fiona.crs
 import fiona.transform
+import owslib.crs
+import owslib.wms
 import rasterio
 import rasterio.sample
+from rasterio import DatasetReader
 
 from wrpylib.wrvalidators import LonLat
 
 LON_LAT_CRS = fiona.crs.from_epsg(4326)
 
 
-def get_ele_from_raster(filename: str, lon_lat: LonLat, band: int = 1) -> Optional[float]:
+XY = Tuple[float, float]
+
+
+def get_ele_from_raster(dataset: DatasetReader, x_y: XY, band: int = 1, crs: str = None) -> Optional[float]:
     """
     Examples of filenames:
     * '/path/to/file.tif'
     * 'file:///path/to/file.tif'
     * 'zip:///path/to/file.zip!/folder/file.tif'
     * 'zip+file:///path/to/file.zip!/folder/file.tif'
+
+    Usage:
+    >>> lon_lat = LonLat(11.2, 47.8)
+    >>> filename = '/path/to/file.tif'
+    >>> with rasterio.open(filename) as dataset:
+    >>>     crs = dataset.crs.to_proj4()
+    >>>     x_y = transform_lon_lat(lon_lat, crs)
+    >>>     ele = get_ele_from_raster(dataset, x_y)
     """
-    with rasterio.open(filename) as dataset:
-        xs, ys = fiona.transform.transform(LON_LAT_CRS, dataset.crs.to_proj4(), [lon_lat.lon], [lon_lat.lat])
-        xy_list = [(x, y) for x, y in zip(xs, ys)]
-        ele = list(rasterio.sample.sample_gen(dataset, xy_list, band))[0]
-        return None if ele == dataset.nodata else float(ele)
+    ele = list(rasterio.sample.sample_gen(dataset, [x_y], band))[0]
+    return None if ele == dataset.nodata else float(ele)
+
+
+def get_bbox_from_wms(wms: owslib.wms.WebMapService, layer_name: str) -> Tuple[Tuple[float, float, float, float], str]:
+    """Returns the bounding box in dataset CRS."""
+    layer = wms.contents[layer_name]
+    bbox_crs = layer.boundingBox
+    return bbox_crs[:4], bbox_crs[-1]
+
+
+def get_ele_from_wms(lon_lat: LonLat, wms: owslib.wms.WebMapService, layer_name: str) -> Optional[float]:
+    nan_value = -9999
+    bbox_width_px = 3
+    bbox_height_px = 3
+    bbox_width_m = 5.
+    bbox_height_m = bbox_width_m
+    left_bottom = _shift_lon_lat(lon_lat, -bbox_height_m / 2., -bbox_height_m / 2.)
+    right_top = _shift_lon_lat(lon_lat, bbox_height_m / 2., bbox_height_m / 2.)
+    response = wms.getfeatureinfo(
+        layers=[layer_name],
+        srs='CRS:84',
+        bbox=(left_bottom.lon, left_bottom.lat, right_top.lon, right_top.lat),
+        size=(bbox_width_px, bbox_height_px),
+        info_format='application/json',
+        xy=(bbox_width_px // 2, bbox_height_px // 2),
+    )
+    info_json_str = response.read().decode()
+    info_json = json.loads(info_json_str)
+    assert info_json['type'] == 'FeatureCollection'
+    features = info_json['features']
+    assert len(features) <= 1
+    if len(features) == 0:
+        return None
+    feature = features[0]
+    assert feature['type'] == 'Feature'
+    ele = feature['properties']['GRAY_INDEX']
+    if ele == nan_value:
+        return None
+    return ele
+
+
+def transform_lon_lat(lon_lat: LonLat, crs: str) -> XY:
+    """Transform the coordinates to dataset CRS."""
+    xs, ys = fiona.transform.transform(LON_LAT_CRS, crs, [lon_lat.lon], [lon_lat.lat])
+    return [(x, y) for x, y in zip(xs, ys)][0]
+
+
+def _shift_lon_lat(orig: LonLat, west_m: float, north_m: float) -> LonLat:
+    """Shifts a given geographic coordinate by a given metric amount"""
+    meter_per_degree = 10_000_000 / 90  # distance equator to pole is 10^7 m
+    return LonLat(orig.lon + (west_m / meter_per_degree) * math.cos(orig.lat * math.pi / 180.),
+                  orig.lat + north_m / meter_per_degree)
+
+
+def _is_in_bbox(x_y: Tuple[float, float], bbox: Tuple[float, float, float, float]) -> bool:
+    x0, y0, x1, y1 = bbox
+    return x0 <= x_y[0] <= x1 and y0 <= x_y[1] <= y1
 
 
 class DemBase:
-    def get_ele(self, lon_lat: LonLat) -> float:
-        return math.nan
+    def close(self):
+        pass
 
     def get_name(self) -> str:
         return self.__class__.__name__
 
+    @abstractmethod
+    def get_ele(self, lon_lat: LonLat) -> Optional[float]:
+        return None
+
+    @abstractmethod
+    def is_in_bbox(self, lon_lat: LonLat) -> bool:
+        pass
+
+
+class DemRasterBase(DemBase):
+    def __init__(self, filename: str, band: int = 1, crs: str = None):
+        self.filename = filename
+        self.band = band
+        self.dataset = rasterio.open(filename)
+        self.crs = crs if crs else self.dataset.crs.to_proj4()
+
+    def close(self):
+        self.dataset.close()
+
+    def get_ele(self, lon_lat: LonLat) -> Optional[float]:
+        x_y = transform_lon_lat(lon_lat, self.crs)
+        return get_ele_from_raster(self.dataset, x_y, self.band, self.crs)
+
+    def is_in_bbox(self, lon_lat: LonLat) -> bool:
+        bbox = self.dataset.bounds
+        x_y = transform_lon_lat(lon_lat, self.crs)
+        return _is_in_bbox(x_y, bbox)
+
 
-class DemBasemap(DemBase):
+class DemBasemap(DemRasterBase):
     def __init__(self):
-        self.filename = '/home/philipp/daten/GeoData/dem/oesterreich_10m/dhm_lamb_10m.tif'
-        # self.filename = 'zip:///home/philipp/daten/GeoData/dem/oesterreich_10m/ogd-10m-at.zip!dhm_lamb_10m.tif'
+        filename = '/home/philipp/daten/GeoData/dem/oesterreich_10m/dhm_lamb_10m.tif'
+        super().__init__(filename)
+
+
+class DemSwitzerland(DemRasterBase):
+    def __init__(self):
+        filename = 'zip:///home/philipp/daten/GeoData/dem/schweiz_200m/dem_schweiz_200m.zip!data/DHM200.asc'
+        super().__init__(filename, crs='EPSG:21781')
+
+
+class DemBavaria(DemRasterBase):
+    def __init__(self):
+        filename = 'zip:///home/philipp/daten/GeoData/dem/bayern_50m/dgm50_epsg25832.tif.zip!dgm50_epsg25832.tif'
+        super().__init__(filename)
+
+
+class DemWmsBase(DemBase):
+    def __init__(self, url: str, wms_version: str, layer_name: str):
+        self.url = url
+        self.wms_version = wms_version
+        self.layer_name = layer_name
+        self.wms = None
+
+    def close(self):
+        pass
+
+    def get_wms(self) -> owslib.wms.WebMapService:
+        if self.wms is None:
+            self.wms = owslib.wms.WebMapService(self.url, version=self.wms_version)
+        return self.wms
+
+    def get_ele(self, lon_lat: LonLat) -> Optional[float]:
+        wms = self.get_wms()
+        return get_ele_from_wms(lon_lat, wms, self.layer_name)
+
+    def is_in_bbox(self, lon_lat: LonLat) -> bool:
+        bbox, crs = get_bbox_from_wms(self.get_wms(), self.layer_name)
+        x_y = transform_lon_lat(lon_lat, crs)
+        return _is_in_bbox(x_y, bbox)
+
+
+class DemSouthTyrol(DemWmsBase):
+    def __init__(self):
+        url = 'http://geoservices.buergernetz.bz.it/geoserver/p_bz-elevation/ows?' \
+              'SERVICE=WMS&request=getCapabilities'
+        super().__init__(url, '1.3.0', 'DTM-2p5m')
+
+
+class MultiDem:
+    def __init__(self):
+        self.dems = [DemBasemap(), DemSouthTyrol(), DemBavaria(), DemSwitzerland()]
+
+    def close(self):
+        for dem in self.dems:
+            dem.close()
 
-    def get_ele(self, lon_lat: LonLat) -> float:
-        return get_ele_from_raster(self.filename, lon_lat)
+    def get_ele(self, lon_lat: LonLat) -> Tuple[Optional[float], Optional[str]]:
+        for dem in self.dems:
+            if dem.is_in_bbox(lon_lat):
+                ele = dem.get_ele(lon_lat)
+                if ele is not None:
+                    return ele, dem.get_name()
+        return None, None