Implement bounding box check.
authorPhilipp Spitzer <philipp@spitzer.priv.at>
Thu, 11 Feb 2021 11:10:05 +0000 (12:10 +0100)
committerPhilipp Spitzer <philipp@spitzer.priv.at>
Thu, 11 Feb 2021 11:10:05 +0000 (12:10 +0100)
scripts/get_elevation.py
tests/test_wrdem.py
wrpylib/wrdem.py

index c7f8cc0acde33a33c9073ca18dc5545878aa5483..df1914f4131872b6c496f414c2489b8d250780f5 100644 (file)
@@ -1,18 +1,17 @@
 import argparse
 
 import argparse
 
-from wrpylib.wrdem import DemBasemap, DemSouthTyrol
+from wrpylib.wrdem import MultiDem
 from wrpylib.wrvalidators import lonlat_from_str, LonLat, lonlat_to_str
 
 
 from wrpylib.wrvalidators import lonlat_from_str, LonLat, lonlat_to_str
 
 
-def main(lonlat: LonLat):
+def main(lon_lat: LonLat):
     """Coordinate like "47.275712 N 11.3456496 E"."""
     """Coordinate like "47.275712 N 11.3456496 E"."""
-    dems = [DemBasemap(), DemSouthTyrol()]
-    for dem in dems:
-        ele = dem.get_ele(lonlat)
-        if ele is not None:
-            print(f'Elevation for {lonlat_to_str(lonlat)}: {ele:.0f} m (source: {dem.get_name()})')
-            return
-    print(f'No elevation data available for {lonlat_to_str(lonlat)}')
+    dem = MultiDem()
+    ele, source = dem.get_ele(lon_lat)
+    if ele is None:
+        print(f'No elevation data available for {lonlat_to_str(lon_lat)}')
+    else:
+        print(f'Elevation for {lonlat_to_str(lon_lat)}: {ele:.0f} m (source: {source})')
 
 
 if __name__ == '__main__':
 
 
 if __name__ == '__main__':
index 8f166db49b4c2d0cec990ad045d600d2bc7885d6..05700d57109e907c4aedc450545055a9d57aea9c 100644 (file)
@@ -1,27 +1,32 @@
 import unittest
 
 import owslib.wms
 import unittest
 
 import owslib.wms
+import rasterio
 
 
-from wrpylib.wrdem import get_ele_from_raster, get_ele_from_wms
+from wrpylib.wrdem import get_ele_from_raster, get_ele_from_wms, MultiDem
 from wrpylib.wrvalidators import LonLat
 
 
 class TestWrDemRaster(unittest.TestCase):
     def setUp(self):
 from wrpylib.wrvalidators import LonLat
 
 
 class TestWrDemRaster(unittest.TestCase):
     def setUp(self):
-        self.filename = '/home/philipp/daten/GeoData/dem/oesterreich_10m/dhm_lamb_10m.tif'
+        filename = '/home/philipp/daten/GeoData/dem/oesterreich_10m/dhm_lamb_10m.tif'
         # https://www.data.gv.at/katalog/dataset/dgm
         # https://www.data.gv.at/katalog/dataset/dgm
+        self.dataset = rasterio.open(filename)
+
+    def tearDown(self):
+        self.dataset.close()
 
     def test_get_ele_from_raster_inside_valid(self):
 
     def test_get_ele_from_raster_inside_valid(self):
-        value = get_ele_from_raster(LonLat(13.682109, 47.934012), self.filename)
+        value = get_ele_from_raster(LonLat(13.682109, 47.934012), self.dataset)
         ele = round(int(value))
         self.assertEqual(ele, 557)
 
     def test_get_ele_from_raster_inside_invalid(self):
         ele = round(int(value))
         self.assertEqual(ele, 557)
 
     def test_get_ele_from_raster_inside_invalid(self):
-        value = get_ele_from_raster(LonLat(14.67656, 48.16182), self.filename)
+        value = get_ele_from_raster(LonLat(14.67656, 48.16182), self.dataset)
         self.assertIsNone(value)
 
     def test_get_ele_from_raster_outside(self):
         self.assertIsNone(value)
 
     def test_get_ele_from_raster_outside(self):
-        value = get_ele_from_raster(LonLat(8.67656, 47.16182), self.filename)
+        value = get_ele_from_raster(LonLat(8.67656, 47.16182), self.dataset)
         self.assertIsNone(value)
 
 
         self.assertIsNone(value)
 
 
@@ -43,3 +48,22 @@ class TestWrDemWms(unittest.TestCase):
     def test_get_ele_from_wms_outside(self):
         ele = get_ele_from_wms(LonLat(9.908062, 45.948257), self.wms, self.layer)
         self.assertIsNone(ele)
     def test_get_ele_from_wms_outside(self):
         ele = get_ele_from_wms(LonLat(9.908062, 45.948257), self.wms, self.layer)
         self.assertIsNone(ele)
+
+
+class TestWrDemMultiDem(unittest.TestCase):
+    def setUp(self):
+        self.dem = MultiDem()
+
+    def tearDown(self):
+        self.dem.close()
+
+    def test_get_ele_valid(self):
+        ele, name = self.dem.get_ele(LonLat(13.682109, 47.934012))
+        self.assertAlmostEqual(ele, 557, delta=0.5)
+
+        ele, name = self.dem.get_ele(LonLat(11.31786, 46.68018))
+        self.assertAlmostEqual(ele, 2140.460, delta=0.1)
+
+    def test_get_ele_outside(self):
+        ele, name = self.dem.get_ele(LonLat(1., 2.))
+        self.assertIsNone(ele)
index b43cec89df37691014354dd4f8fc160871cddb39..4cc9a937aa311795f9f947c4bd0db9f268f8fbe9 100644 (file)
@@ -1,5 +1,6 @@
 import json
 import math
 import json
 import math
+from abc import abstractmethod
 from typing import Optional, Tuple
 
 import fiona.crs
 from typing import Optional, Tuple
 
 import fiona.crs
@@ -8,32 +9,41 @@ import owslib.crs
 import owslib.wms
 import rasterio
 import rasterio.sample
 import owslib.wms
 import rasterio
 import rasterio.sample
+from rasterio import DatasetReader
 
 from wrpylib.wrvalidators import LonLat
 
 LON_LAT_CRS = fiona.crs.from_epsg(4326)
 
 
 
 from wrpylib.wrvalidators import LonLat
 
 LON_LAT_CRS = fiona.crs.from_epsg(4326)
 
 
-def get_ele_from_raster(lon_lat: LonLat, filename: str, band: int = 1) -> Optional[float]:
+def get_bbox_from_raster(dataset: DatasetReader) -> Tuple[Tuple[float, float, float, float], str]:
+    """Returns the bounding box in dataset CRS."""
+    return dataset.bounds, dataset.crs.to_proj4()
+
+
+def get_ele_from_raster(lon_lat: LonLat, dataset: DatasetReader, band: int = 1) -> Optional[float]:
     """
     Examples of filenames:
     * '/path/to/file.tif'
     * 'file:///path/to/file.tif'
     * 'zip:///path/to/file.zip!/folder/file.tif'
     * 'zip+file:///path/to/file.zip!/folder/file.tif'
     """
     Examples of filenames:
     * '/path/to/file.tif'
     * 'file:///path/to/file.tif'
     * 'zip:///path/to/file.zip!/folder/file.tif'
     * 'zip+file:///path/to/file.zip!/folder/file.tif'
+
+    Usage:
+    >>> filename = '/path/to/file.tif'
+    >>> with rasterio.open(filename) as dataset:
+    >>>     ele = get_ele_from_raster(lon_lat, dataset)
     """
     """
-    with rasterio.open(filename) as dataset:
-        xs, ys = fiona.transform.transform(LON_LAT_CRS, dataset.crs.to_proj4(), [lon_lat.lon], [lon_lat.lat])
-        xy_list = [(x, y) for x, y in zip(xs, ys)]
-        ele = list(rasterio.sample.sample_gen(dataset, xy_list, band))[0]
-        return None if ele == dataset.nodata else float(ele)
+    x_y = transform_lon_lat(lon_lat, dataset.crs.to_proj4())
+    ele = list(rasterio.sample.sample_gen(dataset, [x_y], band))[0]
+    return None if ele == dataset.nodata else float(ele)
 
 
 
 
-def _shift_lon_lat(orig: LonLat, west_m: float, north_m: float) -> LonLat:
-    """Shifts a given geographic coordinate by a given metric amount"""
-    meter_per_degree = 10_000_000 / 90  # distance equator to pole is 10^7 m
-    return LonLat(orig.lon + (west_m / meter_per_degree) * math.cos(orig.lat * math.pi / 180.),
-                  orig.lat + north_m / meter_per_degree)
+def get_bbox_from_wms(wms: owslib.wms.WebMapService, layer_name: str) -> Tuple[Tuple[float, float, float, float], str]:
+    """Returns the bounding box in dataset CRS."""
+    layer = wms.contents[layer_name]
+    bbox_crs = layer.boundingBox
+    return bbox_crs[:4], bbox_crs[-1]
 
 
 def get_ele_from_wms(lon_lat: LonLat, wms: owslib.wms.WebMapService, layer_name: str) -> Optional[float]:
 
 
 def get_ele_from_wms(lon_lat: LonLat, wms: owslib.wms.WebMapService, layer_name: str) -> Optional[float]:
@@ -67,21 +77,62 @@ def get_ele_from_wms(lon_lat: LonLat, wms: owslib.wms.WebMapService, layer_name:
     return ele
 
 
     return ele
 
 
+def transform_lon_lat(lon_lat: LonLat, crs: str) -> Tuple[float, float]:
+    """Transform the coordinates to dataset CRS."""
+    xs, ys = fiona.transform.transform(LON_LAT_CRS, crs, [lon_lat.lon], [lon_lat.lat])
+    return [(x, y) for x, y in zip(xs, ys)][0]
+
+
+def _shift_lon_lat(orig: LonLat, west_m: float, north_m: float) -> LonLat:
+    """Shifts a given geographic coordinate by a given metric amount"""
+    meter_per_degree = 10_000_000 / 90  # distance equator to pole is 10^7 m
+    return LonLat(orig.lon + (west_m / meter_per_degree) * math.cos(orig.lat * math.pi / 180.),
+                  orig.lat + north_m / meter_per_degree)
+
+
+def _is_in_bbox(x_y: Tuple[float, float], bbox: Tuple[float, float, float, float]) -> bool:
+    x0, y0, x1, y1 = bbox
+    return x0 <= x_y[0] <= x1 and y0 <= x_y[1] <= y1
+
+
 class DemBase:
 class DemBase:
-    def get_ele(self, lon_lat: LonLat) -> float:
-        return math.nan
+    def close(self):
+        pass
 
     def get_name(self) -> str:
         return self.__class__.__name__
 
 
     def get_name(self) -> str:
         return self.__class__.__name__
 
+    @abstractmethod
+    def get_ele(self, lon_lat: LonLat) -> Optional[float]:
+        return None
 
 
-class DemBasemap(DemBase):
-    def __init__(self):
-        self.filename = '/home/philipp/daten/GeoData/dem/oesterreich_10m/dhm_lamb_10m.tif'
-        # self.filename = 'zip:///home/philipp/daten/GeoData/dem/oesterreich_10m/ogd-10m-at.zip!dhm_lamb_10m.tif'
+    @abstractmethod
+    def is_in_bbox(self, lon_lat: LonLat) -> bool:
+        pass
+
+
+class DemRasterBase(DemBase):
+    def __init__(self, filename: str, band: int = 1):
+        self.filename = filename
+        self.band = band
+        self.dataset = rasterio.open(filename)
+
+    def close(self):
+        self.dataset.close()
+
+    def get_ele(self, lon_lat: LonLat) -> Optional[float]:
+        return get_ele_from_raster(lon_lat, self.dataset, self.band)
 
 
-    def get_ele(self, lon_lat: LonLat) -> float:
-        return get_ele_from_raster(lon_lat, self.filename)
+    def is_in_bbox(self, lon_lat: LonLat) -> bool:
+        bbox, crs = get_bbox_from_raster(self.dataset)
+        x_y = transform_lon_lat(lon_lat, crs)
+        return _is_in_bbox(x_y, bbox)
+
+
+class DemBasemap(DemRasterBase):
+    def __init__(self):
+        filename = '/home/philipp/daten/GeoData/dem/oesterreich_10m/dhm_lamb_10m.tif'
+        super().__init__(filename)
 
 
 class DemWmsBase(DemBase):
 
 
 class DemWmsBase(DemBase):
@@ -91,20 +142,23 @@ class DemWmsBase(DemBase):
         self.layer_name = layer_name
         self.wms = None
 
         self.layer_name = layer_name
         self.wms = None
 
+    def close(self):
+        pass
+
     def get_wms(self) -> owslib.wms.WebMapService:
         if self.wms is None:
             self.wms = owslib.wms.WebMapService(self.url, version=self.wms_version)
         return self.wms
 
     def get_wms(self) -> owslib.wms.WebMapService:
         if self.wms is None:
             self.wms = owslib.wms.WebMapService(self.url, version=self.wms_version)
         return self.wms
 
-    def get_bbox(self) -> Tuple[float, float, float, float]:
-        """Return the bounding box in Lon/Lat WGS84"""
-        layer = self.wms.contents[self.layer_name]
-        return layer.boundingBoxWGS84  # (10.36155279782108, 46.18071678348443, 12.53362295003934, 47.132345455408874)
-
-    def get_ele(self, lon_lat: LonLat) -> float:
+    def get_ele(self, lon_lat: LonLat) -> Optional[float]:
         wms = self.get_wms()
         return get_ele_from_wms(lon_lat, wms, self.layer_name)
 
         wms = self.get_wms()
         return get_ele_from_wms(lon_lat, wms, self.layer_name)
 
+    def is_in_bbox(self, lon_lat: LonLat) -> bool:
+        bbox, crs = get_bbox_from_wms(self.get_wms(), self.layer_name)
+        x_y = transform_lon_lat(lon_lat, crs)
+        return _is_in_bbox(x_y, bbox)
+
 
 class DemSouthTyrol(DemWmsBase):
     def __init__(self):
 
 class DemSouthTyrol(DemWmsBase):
     def __init__(self):
@@ -112,5 +166,19 @@ class DemSouthTyrol(DemWmsBase):
               'SERVICE=WMS&request=getCapabilities'
         super().__init__(url, '1.3.0', 'DTM-2p5m')
 
               'SERVICE=WMS&request=getCapabilities'
         super().__init__(url, '1.3.0', 'DTM-2p5m')
 
-    def get_bbox(self) -> Tuple[float, float, float, float]:
-        return 10.36155279782108, 46.18071678348443, 12.53362295003934, 47.132345455408874
+
+class MultiDem:
+    def __init__(self):
+        self.dems = [DemBasemap(), DemSouthTyrol()]
+
+    def close(self):
+        for dem in self.dems:
+            dem.close()
+
+    def get_ele(self, lon_lat: LonLat) -> Tuple[Optional[float], Optional[str]]:
+        for dem in self.dems:
+            if dem.is_in_bbox(lon_lat):
+                ele = dem.get_ele(lon_lat)
+                if ele is not None:
+                    return ele, dem.get_name()
+        return None, None