Implement bounding box check.
[philipp/winterrodeln/wrpylib.git] / wrpylib / wrdem.py
1 import json
2 import math
3 from abc import abstractmethod
4 from typing import Optional, Tuple
5
6 import fiona.crs
7 import fiona.transform
8 import owslib.crs
9 import owslib.wms
10 import rasterio
11 import rasterio.sample
12 from rasterio import DatasetReader
13
14 from wrpylib.wrvalidators import LonLat
15
16 LON_LAT_CRS = fiona.crs.from_epsg(4326)
17
18
19 def get_bbox_from_raster(dataset: DatasetReader) -> Tuple[Tuple[float, float, float, float], str]:
20     """Returns the bounding box in dataset CRS."""
21     return dataset.bounds, dataset.crs.to_proj4()
22
23
24 def get_ele_from_raster(lon_lat: LonLat, dataset: DatasetReader, band: int = 1) -> Optional[float]:
25     """
26     Examples of filenames:
27     * '/path/to/file.tif'
28     * 'file:///path/to/file.tif'
29     * 'zip:///path/to/file.zip!/folder/file.tif'
30     * 'zip+file:///path/to/file.zip!/folder/file.tif'
31
32     Usage:
33     >>> filename = '/path/to/file.tif'
34     >>> with rasterio.open(filename) as dataset:
35     >>>     ele = get_ele_from_raster(lon_lat, dataset)
36     """
37     x_y = transform_lon_lat(lon_lat, dataset.crs.to_proj4())
38     ele = list(rasterio.sample.sample_gen(dataset, [x_y], band))[0]
39     return None if ele == dataset.nodata else float(ele)
40
41
42 def get_bbox_from_wms(wms: owslib.wms.WebMapService, layer_name: str) -> Tuple[Tuple[float, float, float, float], str]:
43     """Returns the bounding box in dataset CRS."""
44     layer = wms.contents[layer_name]
45     bbox_crs = layer.boundingBox
46     return bbox_crs[:4], bbox_crs[-1]
47
48
49 def get_ele_from_wms(lon_lat: LonLat, wms: owslib.wms.WebMapService, layer_name: str) -> Optional[float]:
50     nan_value = -9999
51     bbox_width_px = 3
52     bbox_height_px = 3
53     bbox_width_m = 5.
54     bbox_height_m = bbox_width_m
55     left_bottom = _shift_lon_lat(lon_lat, -bbox_height_m / 2., -bbox_height_m / 2.)
56     right_top = _shift_lon_lat(lon_lat, bbox_height_m / 2., bbox_height_m / 2.)
57     response = wms.getfeatureinfo(
58         layers=[layer_name],
59         srs='CRS:84',
60         bbox=(left_bottom.lon, left_bottom.lat, right_top.lon, right_top.lat),
61         size=(bbox_width_px, bbox_height_px),
62         info_format='application/json',
63         xy=(bbox_width_px // 2, bbox_height_px // 2),
64     )
65     info_json_str = response.read().decode()
66     info_json = json.loads(info_json_str)
67     assert info_json['type'] == 'FeatureCollection'
68     features = info_json['features']
69     assert len(features) <= 1
70     if len(features) == 0:
71         return None
72     feature = features[0]
73     assert feature['type'] == 'Feature'
74     ele = feature['properties']['GRAY_INDEX']
75     if ele == nan_value:
76         return None
77     return ele
78
79
80 def transform_lon_lat(lon_lat: LonLat, crs: str) -> Tuple[float, float]:
81     """Transform the coordinates to dataset CRS."""
82     xs, ys = fiona.transform.transform(LON_LAT_CRS, crs, [lon_lat.lon], [lon_lat.lat])
83     return [(x, y) for x, y in zip(xs, ys)][0]
84
85
86 def _shift_lon_lat(orig: LonLat, west_m: float, north_m: float) -> LonLat:
87     """Shifts a given geographic coordinate by a given metric amount"""
88     meter_per_degree = 10_000_000 / 90  # distance equator to pole is 10^7 m
89     return LonLat(orig.lon + (west_m / meter_per_degree) * math.cos(orig.lat * math.pi / 180.),
90                   orig.lat + north_m / meter_per_degree)
91
92
93 def _is_in_bbox(x_y: Tuple[float, float], bbox: Tuple[float, float, float, float]) -> bool:
94     x0, y0, x1, y1 = bbox
95     return x0 <= x_y[0] <= x1 and y0 <= x_y[1] <= y1
96
97
98 class DemBase:
99     def close(self):
100         pass
101
102     def get_name(self) -> str:
103         return self.__class__.__name__
104
105     @abstractmethod
106     def get_ele(self, lon_lat: LonLat) -> Optional[float]:
107         return None
108
109     @abstractmethod
110     def is_in_bbox(self, lon_lat: LonLat) -> bool:
111         pass
112
113
114 class DemRasterBase(DemBase):
115     def __init__(self, filename: str, band: int = 1):
116         self.filename = filename
117         self.band = band
118         self.dataset = rasterio.open(filename)
119
120     def close(self):
121         self.dataset.close()
122
123     def get_ele(self, lon_lat: LonLat) -> Optional[float]:
124         return get_ele_from_raster(lon_lat, self.dataset, self.band)
125
126     def is_in_bbox(self, lon_lat: LonLat) -> bool:
127         bbox, crs = get_bbox_from_raster(self.dataset)
128         x_y = transform_lon_lat(lon_lat, crs)
129         return _is_in_bbox(x_y, bbox)
130
131
132 class DemBasemap(DemRasterBase):
133     def __init__(self):
134         filename = '/home/philipp/daten/GeoData/dem/oesterreich_10m/dhm_lamb_10m.tif'
135         super().__init__(filename)
136
137
138 class DemWmsBase(DemBase):
139     def __init__(self, url: str, wms_version: str, layer_name: str):
140         self.url = url
141         self.wms_version = wms_version
142         self.layer_name = layer_name
143         self.wms = None
144
145     def close(self):
146         pass
147
148     def get_wms(self) -> owslib.wms.WebMapService:
149         if self.wms is None:
150             self.wms = owslib.wms.WebMapService(self.url, version=self.wms_version)
151         return self.wms
152
153     def get_ele(self, lon_lat: LonLat) -> Optional[float]:
154         wms = self.get_wms()
155         return get_ele_from_wms(lon_lat, wms, self.layer_name)
156
157     def is_in_bbox(self, lon_lat: LonLat) -> bool:
158         bbox, crs = get_bbox_from_wms(self.get_wms(), self.layer_name)
159         x_y = transform_lon_lat(lon_lat, crs)
160         return _is_in_bbox(x_y, bbox)
161
162
163 class DemSouthTyrol(DemWmsBase):
164     def __init__(self):
165         url = 'http://geoservices.buergernetz.bz.it/geoserver/p_bz-elevation/ows?' \
166               'SERVICE=WMS&request=getCapabilities'
167         super().__init__(url, '1.3.0', 'DTM-2p5m')
168
169
170 class MultiDem:
171     def __init__(self):
172         self.dems = [DemBasemap(), DemSouthTyrol()]
173
174     def close(self):
175         for dem in self.dems:
176             dem.close()
177
178     def get_ele(self, lon_lat: LonLat) -> Tuple[Optional[float], Optional[str]]:
179         for dem in self.dems:
180             if dem.is_in_bbox(lon_lat):
181                 ele = dem.get_ele(lon_lat)
182                 if ele is not None:
183                     return ele, dem.get_name()
184         return None, None