The order of XML attributes is now retained.
[philipp/winterrodeln/wrpylib.git] / wrpylib / wrdem.py
1 import json
2 import math
3 from abc import abstractmethod
4 from typing import Optional, Tuple
5
6 import fiona.crs  # pip install Fiona
7 import fiona.transform
8 import owslib.crs
9 import owslib.wms
10 import rasterio
11 import rasterio.sample
12 from rasterio import DatasetReader
13
14 from wrpylib.wrvalidators import LonLat
15
16 LON_LAT_CRS = fiona.crs.from_epsg(4326)
17
18
19 XY = Tuple[float, float]
20
21
22 def get_ele_from_raster(dataset: DatasetReader, x_y: XY, band: int = 1, crs: str = None) -> Optional[float]:
23     """
24     Examples of filenames:
25     * '/path/to/file.tif'
26     * 'file:///path/to/file.tif'
27     * 'zip:///path/to/file.zip!/folder/file.tif'
28     * 'zip+file:///path/to/file.zip!/folder/file.tif'
29
30     Usage:
31     >>> lon_lat = LonLat(11.2, 47.8)
32     >>> filename = '/path/to/file.tif'
33     >>> with rasterio.open(filename) as dataset:
34     >>>     crs = dataset.crs.to_proj4()
35     >>>     x_y = transform_lon_lat(lon_lat, crs)
36     >>>     ele = get_ele_from_raster(dataset, x_y)
37     """
38     ele = list(rasterio.sample.sample_gen(dataset, [x_y], band))[0]
39     return None if ele == dataset.nodata else float(ele)
40
41
42 def get_bbox_from_wms(wms: owslib.wms.WebMapService, layer_name: str) -> Tuple[Tuple[float, float, float, float], str]:
43     """Returns the bounding box in dataset CRS."""
44     layer = wms.contents[layer_name]
45     bbox_crs = layer.boundingBox
46     return bbox_crs[:4], bbox_crs[-1]
47
48
49 def get_ele_from_wms(lon_lat: LonLat, wms: owslib.wms.WebMapService, layer_name: str) -> Optional[float]:
50     nan_value = -9999
51     bbox_width_px = 3
52     bbox_height_px = 3
53     bbox_width_m = 5.
54     bbox_height_m = bbox_width_m
55     left_bottom = _shift_lon_lat(lon_lat, -bbox_height_m / 2., -bbox_height_m / 2.)
56     right_top = _shift_lon_lat(lon_lat, bbox_height_m / 2., bbox_height_m / 2.)
57     response = wms.getfeatureinfo(
58         layers=[layer_name],
59         srs='CRS:84',
60         bbox=(left_bottom.lon, left_bottom.lat, right_top.lon, right_top.lat),
61         size=(bbox_width_px, bbox_height_px),
62         info_format='application/json',
63         xy=(bbox_width_px // 2, bbox_height_px // 2),
64     )
65     info_json_str = response.read().decode()
66     info_json = json.loads(info_json_str)
67     assert info_json['type'] == 'FeatureCollection'
68     features = info_json['features']
69     assert len(features) <= 1
70     if len(features) == 0:
71         return None
72     feature = features[0]
73     assert feature['type'] == 'Feature'
74     ele = feature['properties']['GRAY_INDEX']
75     if ele == nan_value:
76         return None
77     return ele
78
79
80 def transform_lon_lat(lon_lat: LonLat, crs: str) -> XY:
81     """Transform the coordinates to dataset CRS."""
82     xs, ys = fiona.transform.transform(LON_LAT_CRS, crs, [lon_lat.lon], [lon_lat.lat])
83     return [(x, y) for x, y in zip(xs, ys)][0]
84
85
86 def _shift_lon_lat(orig: LonLat, west_m: float, north_m: float) -> LonLat:
87     """Shifts a given geographic coordinate by a given metric amount"""
88     meter_per_degree = 10_000_000 / 90  # distance equator to pole is 10^7 m
89     return LonLat(orig.lon + (west_m / meter_per_degree) * math.cos(orig.lat * math.pi / 180.),
90                   orig.lat + north_m / meter_per_degree)
91
92
93 def _is_in_bbox(x_y: Tuple[float, float], bbox: Tuple[float, float, float, float]) -> bool:
94     x0, y0, x1, y1 = bbox
95     return x0 <= x_y[0] <= x1 and y0 <= x_y[1] <= y1
96
97
98 class DemBase:
99     def close(self):
100         pass
101
102     def get_name(self) -> str:
103         return self.__class__.__name__
104
105     @abstractmethod
106     def get_ele(self, lon_lat: LonLat) -> Optional[float]:
107         return None
108
109     @abstractmethod
110     def is_in_bbox(self, lon_lat: LonLat) -> bool:
111         pass
112
113
114 class DemRasterBase(DemBase):
115     def __init__(self, filename: str, band: int = 1, crs: str = None):
116         self.filename = filename
117         self.band = band
118         self.dataset = rasterio.open(filename)
119         self.crs = crs if crs else self.dataset.crs.to_proj4()
120
121     def close(self):
122         self.dataset.close()
123
124     def get_ele(self, lon_lat: LonLat) -> Optional[float]:
125         x_y = transform_lon_lat(lon_lat, self.crs)
126         return get_ele_from_raster(self.dataset, x_y, self.band, self.crs)
127
128     def is_in_bbox(self, lon_lat: LonLat) -> bool:
129         bbox = self.dataset.bounds
130         x_y = transform_lon_lat(lon_lat, self.crs)
131         return _is_in_bbox(x_y, bbox)
132
133
134 class DemBasemap(DemRasterBase):
135     def __init__(self):
136         filename = '/home/philipp/daten/GeoData/dem/oesterreich_10m/dhm_lamb_10m.tif'
137         super().__init__(filename)
138
139
140 class DemSwitzerland(DemRasterBase):
141     def __init__(self):
142         filename = 'zip:///home/philipp/daten/GeoData/dem/schweiz_200m/dem_schweiz_200m.zip!data/DHM200.asc'
143         super().__init__(filename, crs='EPSG:21781')
144
145
146 class DemBavaria(DemRasterBase):
147     def __init__(self):
148         filename = 'zip:///home/philipp/daten/GeoData/dem/bayern_50m/dgm50_epsg25832.tif.zip!dgm50_epsg25832.tif'
149         super().__init__(filename)
150
151
152 class DemWmsBase(DemBase):
153     def __init__(self, url: str, wms_version: str, layer_name: str):
154         self.url = url
155         self.wms_version = wms_version
156         self.layer_name = layer_name
157         self.wms = None
158
159     def close(self):
160         pass
161
162     def get_wms(self) -> owslib.wms.WebMapService:
163         if self.wms is None:
164             self.wms = owslib.wms.WebMapService(self.url, version=self.wms_version)
165         return self.wms
166
167     def get_ele(self, lon_lat: LonLat) -> Optional[float]:
168         wms = self.get_wms()
169         return get_ele_from_wms(lon_lat, wms, self.layer_name)
170
171     def is_in_bbox(self, lon_lat: LonLat) -> bool:
172         bbox, crs = get_bbox_from_wms(self.get_wms(), self.layer_name)
173         x_y = transform_lon_lat(lon_lat, crs)
174         return _is_in_bbox(x_y, bbox)
175
176
177 class DemSouthTyrol(DemWmsBase):
178     def __init__(self):
179         url = 'http://geoservices.buergernetz.bz.it/geoserver/p_bz-elevation/ows?' \
180               'SERVICE=WMS&request=getCapabilities'
181         super().__init__(url, '1.3.0', 'DTM-2p5m')
182
183
184 class MultiDem:
185     def __init__(self):
186         self.dems = [
187             DemBasemap(),
188             DemSouthTyrol(),
189             # DemBavaria(),  # too coarse - we should have a 25 m grid or denser - this is 50 m
190             # DemSwitzerland(), # too coarse - we should have a 25 m grid or denser - this is 200 m
191         ]
192
193     def close(self):
194         for dem in self.dems:
195             dem.close()
196
197     def get_ele(self, lon_lat: LonLat) -> Tuple[Optional[float], Optional[str]]:
198         for dem in self.dems:
199             if dem.is_in_bbox(lon_lat):
200                 ele = dem.get_ele(lon_lat)
201                 if ele is not None:
202                     return ele, dem.get_name()
203         return None, None