Add Bavaria.
[philipp/winterrodeln/wrpylib.git] / wrpylib / wrdem.py
index b43cec89df37691014354dd4f8fc160871cddb39..52b41d7fc54337c40883692a4e9df6d406c39875 100644 (file)
@@ -1,5 +1,7 @@
 import json
 import math
+from abc import abstractmethod
+from collections import namedtuple
 from typing import Optional, Tuple
 
 import fiona.crs
@@ -8,32 +10,41 @@ import owslib.crs
 import owslib.wms
 import rasterio
 import rasterio.sample
+from rasterio import DatasetReader
 
 from wrpylib.wrvalidators import LonLat
 
 LON_LAT_CRS = fiona.crs.from_epsg(4326)
 
 
-def get_ele_from_raster(lon_lat: LonLat, filename: str, band: int = 1) -> Optional[float]:
+XY = Tuple[float, float]
+
+
+def get_ele_from_raster(dataset: DatasetReader, x_y: XY, band: int = 1, crs: str = None) -> Optional[float]:
     """
     Examples of filenames:
     * '/path/to/file.tif'
     * 'file:///path/to/file.tif'
     * 'zip:///path/to/file.zip!/folder/file.tif'
     * 'zip+file:///path/to/file.zip!/folder/file.tif'
+
+    Usage:
+    >>> lon_lat = LonLat(11.2, 47.8)
+    >>> filename = '/path/to/file.tif'
+    >>> with rasterio.open(filename) as dataset:
+    >>>     crs = dataset.crs.to_proj4()
+    >>>     x_y = transform_lon_lat(lon_lat, crs)
+    >>>     ele = get_ele_from_raster(dataset, x_y)
     """
-    with rasterio.open(filename) as dataset:
-        xs, ys = fiona.transform.transform(LON_LAT_CRS, dataset.crs.to_proj4(), [lon_lat.lon], [lon_lat.lat])
-        xy_list = [(x, y) for x, y in zip(xs, ys)]
-        ele = list(rasterio.sample.sample_gen(dataset, xy_list, band))[0]
-        return None if ele == dataset.nodata else float(ele)
+    ele = list(rasterio.sample.sample_gen(dataset, [x_y], band))[0]
+    return None if ele == dataset.nodata else float(ele)
 
 
-def _shift_lon_lat(orig: LonLat, west_m: float, north_m: float) -> LonLat:
-    """Shifts a given geographic coordinate by a given metric amount"""
-    meter_per_degree = 10_000_000 / 90  # distance equator to pole is 10^7 m
-    return LonLat(orig.lon + (west_m / meter_per_degree) * math.cos(orig.lat * math.pi / 180.),
-                  orig.lat + north_m / meter_per_degree)
+def get_bbox_from_wms(wms: owslib.wms.WebMapService, layer_name: str) -> Tuple[Tuple[float, float, float, float], str]:
+    """Returns the bounding box in dataset CRS."""
+    layer = wms.contents[layer_name]
+    bbox_crs = layer.boundingBox
+    return bbox_crs[:4], bbox_crs[-1]
 
 
 def get_ele_from_wms(lon_lat: LonLat, wms: owslib.wms.WebMapService, layer_name: str) -> Optional[float]:
@@ -67,21 +78,76 @@ def get_ele_from_wms(lon_lat: LonLat, wms: owslib.wms.WebMapService, layer_name:
     return ele
 
 
+def transform_lon_lat(lon_lat: LonLat, crs: str) -> XY:
+    """Transform the coordinates to dataset CRS."""
+    xs, ys = fiona.transform.transform(LON_LAT_CRS, crs, [lon_lat.lon], [lon_lat.lat])
+    return [(x, y) for x, y in zip(xs, ys)][0]
+
+
+def _shift_lon_lat(orig: LonLat, west_m: float, north_m: float) -> LonLat:
+    """Shifts a given geographic coordinate by a given metric amount"""
+    meter_per_degree = 10_000_000 / 90  # distance equator to pole is 10^7 m
+    return LonLat(orig.lon + (west_m / meter_per_degree) * math.cos(orig.lat * math.pi / 180.),
+                  orig.lat + north_m / meter_per_degree)
+
+
+def _is_in_bbox(x_y: Tuple[float, float], bbox: Tuple[float, float, float, float]) -> bool:
+    x0, y0, x1, y1 = bbox
+    return x0 <= x_y[0] <= x1 and y0 <= x_y[1] <= y1
+
+
 class DemBase:
-    def get_ele(self, lon_lat: LonLat) -> float:
-        return math.nan
+    def close(self):
+        pass
 
     def get_name(self) -> str:
         return self.__class__.__name__
 
+    @abstractmethod
+    def get_ele(self, lon_lat: LonLat) -> Optional[float]:
+        return None
+
+    @abstractmethod
+    def is_in_bbox(self, lon_lat: LonLat) -> bool:
+        pass
+
+
+class DemRasterBase(DemBase):
+    def __init__(self, filename: str, band: int = 1, crs: str = None):
+        self.filename = filename
+        self.band = band
+        self.dataset = rasterio.open(filename)
+        self.crs = crs if crs else self.dataset.crs.to_proj4()
+
+    def close(self):
+        self.dataset.close()
+
+    def get_ele(self, lon_lat: LonLat) -> Optional[float]:
+        x_y = transform_lon_lat(lon_lat, self.crs)
+        return get_ele_from_raster(self.dataset, x_y, self.band, self.crs)
+
+    def is_in_bbox(self, lon_lat: LonLat) -> bool:
+        bbox = self.dataset.bounds
+        x_y = transform_lon_lat(lon_lat, self.crs)
+        return _is_in_bbox(x_y, bbox)
+
+
+class DemBasemap(DemRasterBase):
+    def __init__(self):
+        filename = '/home/philipp/daten/GeoData/dem/oesterreich_10m/dhm_lamb_10m.tif'
+        super().__init__(filename)
+
 
-class DemBasemap(DemBase):
+class DemSwitzerland(DemRasterBase):
     def __init__(self):
-        self.filename = '/home/philipp/daten/GeoData/dem/oesterreich_10m/dhm_lamb_10m.tif'
-        # self.filename = 'zip:///home/philipp/daten/GeoData/dem/oesterreich_10m/ogd-10m-at.zip!dhm_lamb_10m.tif'
+        filename = 'zip:///home/philipp/daten/GeoData/dem/schweiz_200m/dem_schweiz_200m.zip!data/DHM200.asc'
+        super().__init__(filename, crs='EPSG:21781')
 
-    def get_ele(self, lon_lat: LonLat) -> float:
-        return get_ele_from_raster(lon_lat, self.filename)
+
+class DemBavaria(DemRasterBase):
+    def __init__(self):
+        filename = 'zip:///home/philipp/daten/GeoData/dem/bayern_50m/dgm50_epsg25832.tif.zip!dgm50_epsg25832.tif'
+        super().__init__(filename)
 
 
 class DemWmsBase(DemBase):
@@ -91,20 +157,23 @@ class DemWmsBase(DemBase):
         self.layer_name = layer_name
         self.wms = None
 
+    def close(self):
+        pass
+
     def get_wms(self) -> owslib.wms.WebMapService:
         if self.wms is None:
             self.wms = owslib.wms.WebMapService(self.url, version=self.wms_version)
         return self.wms
 
-    def get_bbox(self) -> Tuple[float, float, float, float]:
-        """Return the bounding box in Lon/Lat WGS84"""
-        layer = self.wms.contents[self.layer_name]
-        return layer.boundingBoxWGS84  # (10.36155279782108, 46.18071678348443, 12.53362295003934, 47.132345455408874)
-
-    def get_ele(self, lon_lat: LonLat) -> float:
+    def get_ele(self, lon_lat: LonLat) -> Optional[float]:
         wms = self.get_wms()
         return get_ele_from_wms(lon_lat, wms, self.layer_name)
 
+    def is_in_bbox(self, lon_lat: LonLat) -> bool:
+        bbox, crs = get_bbox_from_wms(self.get_wms(), self.layer_name)
+        x_y = transform_lon_lat(lon_lat, crs)
+        return _is_in_bbox(x_y, bbox)
+
 
 class DemSouthTyrol(DemWmsBase):
     def __init__(self):
@@ -112,5 +181,19 @@ class DemSouthTyrol(DemWmsBase):
               'SERVICE=WMS&request=getCapabilities'
         super().__init__(url, '1.3.0', 'DTM-2p5m')
 
-    def get_bbox(self) -> Tuple[float, float, float, float]:
-        return 10.36155279782108, 46.18071678348443, 12.53362295003934, 47.132345455408874
+
+class MultiDem:
+    def __init__(self):
+        self.dems = [DemBasemap(), DemSouthTyrol(), DemBavaria(), DemSwitzerland()]
+
+    def close(self):
+        for dem in self.dems:
+            dem.close()
+
+    def get_ele(self, lon_lat: LonLat) -> Tuple[Optional[float], Optional[str]]:
+        for dem in self.dems:
+            if dem.is_in_bbox(lon_lat):
+                ele = dem.get_ele(lon_lat)
+                if ele is not None:
+                    return ele, dem.get_name()
+        return None, None