Implement DEM for South Tyrol.
[philipp/winterrodeln/wrpylib.git] / wrpylib / wrdem.py
1 import json
2 import math
3 from typing import Optional, Tuple
4
5 import fiona.crs
6 import fiona.transform
7 import owslib.crs
8 import owslib.wms
9 import rasterio
10 import rasterio.sample
11
12 from wrpylib.wrvalidators import LonLat
13
14 LON_LAT_CRS = fiona.crs.from_epsg(4326)
15
16
17 def get_ele_from_raster(lon_lat: LonLat, filename: str, band: int = 1) -> Optional[float]:
18     """
19     Examples of filenames:
20     * '/path/to/file.tif'
21     * 'file:///path/to/file.tif'
22     * 'zip:///path/to/file.zip!/folder/file.tif'
23     * 'zip+file:///path/to/file.zip!/folder/file.tif'
24     """
25     with rasterio.open(filename) as dataset:
26         xs, ys = fiona.transform.transform(LON_LAT_CRS, dataset.crs.to_proj4(), [lon_lat.lon], [lon_lat.lat])
27         xy_list = [(x, y) for x, y in zip(xs, ys)]
28         ele = list(rasterio.sample.sample_gen(dataset, xy_list, band))[0]
29         return None if ele == dataset.nodata else float(ele)
30
31
32 def _shift_lon_lat(orig: LonLat, west_m: float, north_m: float) -> LonLat:
33     """Shifts a given geographic coordinate by a given metric amount"""
34     meter_per_degree = 10_000_000 / 90  # distance equator to pole is 10^7 m
35     return LonLat(orig.lon + (west_m / meter_per_degree) * math.cos(orig.lat * math.pi / 180.),
36                   orig.lat + north_m / meter_per_degree)
37
38
39 def get_ele_from_wms(lon_lat: LonLat, wms: owslib.wms.WebMapService, layer_name: str) -> Optional[float]:
40     nan_value = -9999
41     bbox_width_px = 3
42     bbox_height_px = 3
43     bbox_width_m = 5.
44     bbox_height_m = bbox_width_m
45     left_bottom = _shift_lon_lat(lon_lat, -bbox_height_m / 2., -bbox_height_m / 2.)
46     right_top = _shift_lon_lat(lon_lat, bbox_height_m / 2., bbox_height_m / 2.)
47     response = wms.getfeatureinfo(
48         layers=[layer_name],
49         srs='CRS:84',
50         bbox=(left_bottom.lon, left_bottom.lat, right_top.lon, right_top.lat),
51         size=(bbox_width_px, bbox_height_px),
52         info_format='application/json',
53         xy=(bbox_width_px // 2, bbox_height_px // 2),
54     )
55     info_json_str = response.read().decode()
56     info_json = json.loads(info_json_str)
57     assert info_json['type'] == 'FeatureCollection'
58     features = info_json['features']
59     assert len(features) <= 1
60     if len(features) == 0:
61         return None
62     feature = features[0]
63     assert feature['type'] == 'Feature'
64     ele = feature['properties']['GRAY_INDEX']
65     if ele == nan_value:
66         return None
67     return ele
68
69
70 class DemBase:
71     def get_ele(self, lon_lat: LonLat) -> float:
72         return math.nan
73
74     def get_name(self) -> str:
75         return self.__class__.__name__
76
77
78 class DemBasemap(DemBase):
79     def __init__(self):
80         self.filename = '/home/philipp/daten/GeoData/dem/oesterreich_10m/dhm_lamb_10m.tif'
81         # self.filename = 'zip:///home/philipp/daten/GeoData/dem/oesterreich_10m/ogd-10m-at.zip!dhm_lamb_10m.tif'
82
83     def get_ele(self, lon_lat: LonLat) -> float:
84         return get_ele_from_raster(lon_lat, self.filename)
85
86
87 class DemWmsBase(DemBase):
88     def __init__(self, url: str, wms_version: str, layer_name: str):
89         self.url = url
90         self.wms_version = wms_version
91         self.layer_name = layer_name
92         self.wms = None
93
94     def get_wms(self) -> owslib.wms.WebMapService:
95         if self.wms is None:
96             self.wms = owslib.wms.WebMapService(self.url, version=self.wms_version)
97         return self.wms
98
99     def get_bbox(self) -> Tuple[float, float, float, float]:
100         """Return the bounding box in Lon/Lat WGS84"""
101         layer = self.wms.contents[self.layer_name]
102         return layer.boundingBoxWGS84  # (10.36155279782108, 46.18071678348443, 12.53362295003934, 47.132345455408874)
103
104     def get_ele(self, lon_lat: LonLat) -> float:
105         wms = self.get_wms()
106         return get_ele_from_wms(lon_lat, wms, self.layer_name)
107
108
109 class DemSouthTyrol(DemWmsBase):
110     def __init__(self):
111         url = 'http://geoservices.buergernetz.bz.it/geoserver/p_bz-elevation/ows?' \
112               'SERVICE=WMS&request=getCapabilities'
113         super().__init__(url, '1.3.0', 'DTM-2p5m')
114
115     def get_bbox(self) -> Tuple[float, float, float, float]:
116         return 10.36155279782108, 46.18071678348443, 12.53362295003934, 47.132345455408874