9184247d412beed292c5f8cdd220ec366f7650ae
[philipp/winterrodeln/wrpylib.git] / wrpylib / wrdem.py
1 import json
2 import math
3 from abc import abstractmethod
4 from collections import namedtuple
5 from typing import Optional, Tuple
6
7 import fiona.crs
8 import fiona.transform
9 import owslib.crs
10 import owslib.wms
11 import rasterio
12 import rasterio.sample
13 from rasterio import DatasetReader
14
15 from wrpylib.wrvalidators import LonLat
16
17 LON_LAT_CRS = fiona.crs.from_epsg(4326)
18
19
20 XY = Tuple[float, float]
21
22
23 def get_ele_from_raster(dataset: DatasetReader, x_y: XY, band: int = 1, crs: str = None) -> Optional[float]:
24     """
25     Examples of filenames:
26     * '/path/to/file.tif'
27     * 'file:///path/to/file.tif'
28     * 'zip:///path/to/file.zip!/folder/file.tif'
29     * 'zip+file:///path/to/file.zip!/folder/file.tif'
30
31     Usage:
32     >>> lon_lat = LonLat(11.2, 47.8)
33     >>> filename = '/path/to/file.tif'
34     >>> with rasterio.open(filename) as dataset:
35     >>>     crs = dataset.crs.to_proj4()
36     >>>     x_y = transform_lon_lat(lon_lat, crs)
37     >>>     ele = get_ele_from_raster(dataset, x_y)
38     """
39     ele = list(rasterio.sample.sample_gen(dataset, [x_y], band))[0]
40     return None if ele == dataset.nodata else float(ele)
41
42
43 def get_bbox_from_wms(wms: owslib.wms.WebMapService, layer_name: str) -> Tuple[Tuple[float, float, float, float], str]:
44     """Returns the bounding box in dataset CRS."""
45     layer = wms.contents[layer_name]
46     bbox_crs = layer.boundingBox
47     return bbox_crs[:4], bbox_crs[-1]
48
49
50 def get_ele_from_wms(lon_lat: LonLat, wms: owslib.wms.WebMapService, layer_name: str) -> Optional[float]:
51     nan_value = -9999
52     bbox_width_px = 3
53     bbox_height_px = 3
54     bbox_width_m = 5.
55     bbox_height_m = bbox_width_m
56     left_bottom = _shift_lon_lat(lon_lat, -bbox_height_m / 2., -bbox_height_m / 2.)
57     right_top = _shift_lon_lat(lon_lat, bbox_height_m / 2., bbox_height_m / 2.)
58     response = wms.getfeatureinfo(
59         layers=[layer_name],
60         srs='CRS:84',
61         bbox=(left_bottom.lon, left_bottom.lat, right_top.lon, right_top.lat),
62         size=(bbox_width_px, bbox_height_px),
63         info_format='application/json',
64         xy=(bbox_width_px // 2, bbox_height_px // 2),
65     )
66     info_json_str = response.read().decode()
67     info_json = json.loads(info_json_str)
68     assert info_json['type'] == 'FeatureCollection'
69     features = info_json['features']
70     assert len(features) <= 1
71     if len(features) == 0:
72         return None
73     feature = features[0]
74     assert feature['type'] == 'Feature'
75     ele = feature['properties']['GRAY_INDEX']
76     if ele == nan_value:
77         return None
78     return ele
79
80
81 def transform_lon_lat(lon_lat: LonLat, crs: str) -> XY:
82     """Transform the coordinates to dataset CRS."""
83     xs, ys = fiona.transform.transform(LON_LAT_CRS, crs, [lon_lat.lon], [lon_lat.lat])
84     return [(x, y) for x, y in zip(xs, ys)][0]
85
86
87 def _shift_lon_lat(orig: LonLat, west_m: float, north_m: float) -> LonLat:
88     """Shifts a given geographic coordinate by a given metric amount"""
89     meter_per_degree = 10_000_000 / 90  # distance equator to pole is 10^7 m
90     return LonLat(orig.lon + (west_m / meter_per_degree) * math.cos(orig.lat * math.pi / 180.),
91                   orig.lat + north_m / meter_per_degree)
92
93
94 def _is_in_bbox(x_y: Tuple[float, float], bbox: Tuple[float, float, float, float]) -> bool:
95     x0, y0, x1, y1 = bbox
96     return x0 <= x_y[0] <= x1 and y0 <= x_y[1] <= y1
97
98
99 class DemBase:
100     def close(self):
101         pass
102
103     def get_name(self) -> str:
104         return self.__class__.__name__
105
106     @abstractmethod
107     def get_ele(self, lon_lat: LonLat) -> Optional[float]:
108         return None
109
110     @abstractmethod
111     def is_in_bbox(self, lon_lat: LonLat) -> bool:
112         pass
113
114
115 class DemRasterBase(DemBase):
116     def __init__(self, filename: str, band: int = 1, crs: str = None):
117         self.filename = filename
118         self.band = band
119         self.dataset = rasterio.open(filename)
120         self.crs = crs if crs else self.dataset.crs.to_proj4()
121
122     def close(self):
123         self.dataset.close()
124
125     def get_ele(self, lon_lat: LonLat) -> Optional[float]:
126         x_y = transform_lon_lat(lon_lat, self.crs)
127         return get_ele_from_raster(self.dataset, x_y, self.band, self.crs)
128
129     def is_in_bbox(self, lon_lat: LonLat) -> bool:
130         bbox = self.dataset.bounds
131         x_y = transform_lon_lat(lon_lat, self.crs)
132         return _is_in_bbox(x_y, bbox)
133
134
135 class DemBasemap(DemRasterBase):
136     def __init__(self):
137         filename = '/home/philipp/daten/GeoData/dem/oesterreich_10m/dhm_lamb_10m.tif'
138         super().__init__(filename)
139
140
141 class DemSwitzerland(DemRasterBase):
142     def __init__(self):
143         filename = '/home/philipp/daten/GeoData/dem/schweiz_200m/data/DHM200.xyz'
144         filename = '/home/philipp/daten/GeoData/dem/schweiz_200m/data/DHM200.asc'
145         # https://www.swisstopo.admin.ch/de/swisstopo/kostenlose-geobasisdaten.html
146         super().__init__(filename, crs='EPSG:21781')
147
148
149 class DemWmsBase(DemBase):
150     def __init__(self, url: str, wms_version: str, layer_name: str):
151         self.url = url
152         self.wms_version = wms_version
153         self.layer_name = layer_name
154         self.wms = None
155
156     def close(self):
157         pass
158
159     def get_wms(self) -> owslib.wms.WebMapService:
160         if self.wms is None:
161             self.wms = owslib.wms.WebMapService(self.url, version=self.wms_version)
162         return self.wms
163
164     def get_ele(self, lon_lat: LonLat) -> Optional[float]:
165         wms = self.get_wms()
166         return get_ele_from_wms(lon_lat, wms, self.layer_name)
167
168     def is_in_bbox(self, lon_lat: LonLat) -> bool:
169         bbox, crs = get_bbox_from_wms(self.get_wms(), self.layer_name)
170         x_y = transform_lon_lat(lon_lat, crs)
171         return _is_in_bbox(x_y, bbox)
172
173
174 class DemSouthTyrol(DemWmsBase):
175     def __init__(self):
176         url = 'http://geoservices.buergernetz.bz.it/geoserver/p_bz-elevation/ows?' \
177               'SERVICE=WMS&request=getCapabilities'
178         super().__init__(url, '1.3.0', 'DTM-2p5m')
179
180
181 class MultiDem:
182     def __init__(self):
183         self.dems = [DemBasemap(), DemSouthTyrol(), DemSwitzerland()]
184
185     def close(self):
186         for dem in self.dems:
187             dem.close()
188
189     def get_ele(self, lon_lat: LonLat) -> Tuple[Optional[float], Optional[str]]:
190         for dem in self.dems:
191             if dem.is_in_bbox(lon_lat):
192                 ele = dem.get_ele(lon_lat)
193                 if ele is not None:
194                     return ele, dem.get_name()
195         return None, None