3 from abc import abstractmethod
4 from typing import Optional, Tuple
6 import fiona.crs # pip install Fiona
11 import rasterio.sample
12 from rasterio import DatasetReader
14 from wrpylib.wrvalidators import LonLat
16 LON_LAT_CRS = fiona.crs.from_epsg(4326)
19 XY = Tuple[float, float]
22 def get_ele_from_raster(dataset: DatasetReader, x_y: XY, band: int = 1, crs: str = None) -> Optional[float]:
24 Examples of filenames:
26 * 'file:///path/to/file.tif'
27 * 'zip:///path/to/file.zip!/folder/file.tif'
28 * 'zip+file:///path/to/file.zip!/folder/file.tif'
31 >>> lon_lat = LonLat(11.2, 47.8)
32 >>> filename = '/path/to/file.tif'
33 >>> with rasterio.open(filename) as dataset:
34 >>> crs = dataset.crs.to_proj4()
35 >>> x_y = transform_lon_lat(lon_lat, crs)
36 >>> ele = get_ele_from_raster(dataset, x_y)
38 ele = list(rasterio.sample.sample_gen(dataset, [x_y], band))[0]
39 return None if ele == dataset.nodata else float(ele)
42 def get_bbox_from_wms(wms: owslib.wms.WebMapService, layer_name: str) -> Tuple[Tuple[float, float, float, float], str]:
43 """Returns the bounding box in dataset CRS."""
44 layer = wms.contents[layer_name]
45 bbox_crs = layer.boundingBox
46 return bbox_crs[:4], bbox_crs[-1]
49 def get_ele_from_wms(lon_lat: LonLat, wms: owslib.wms.WebMapService, layer_name: str) -> Optional[float]:
54 bbox_height_m = bbox_width_m
55 left_bottom = _shift_lon_lat(lon_lat, -bbox_height_m / 2., -bbox_height_m / 2.)
56 right_top = _shift_lon_lat(lon_lat, bbox_height_m / 2., bbox_height_m / 2.)
57 response = wms.getfeatureinfo(
60 bbox=(left_bottom.lon, left_bottom.lat, right_top.lon, right_top.lat),
61 size=(bbox_width_px, bbox_height_px),
62 info_format='application/json',
63 xy=(bbox_width_px // 2, bbox_height_px // 2),
65 info_json_str = response.read().decode()
66 info_json = json.loads(info_json_str)
67 assert info_json['type'] == 'FeatureCollection'
68 features = info_json['features']
69 assert len(features) <= 1
70 if len(features) == 0:
73 assert feature['type'] == 'Feature'
74 ele = feature['properties']['GRAY_INDEX']
80 def transform_lon_lat(lon_lat: LonLat, crs: str) -> XY:
81 """Transform the coordinates to dataset CRS."""
82 xs, ys = fiona.transform.transform(LON_LAT_CRS, crs, [lon_lat.lon], [lon_lat.lat])
83 return [(x, y) for x, y in zip(xs, ys)][0]
86 def _shift_lon_lat(orig: LonLat, west_m: float, north_m: float) -> LonLat:
87 """Shifts a given geographic coordinate by a given metric amount"""
88 meter_per_degree = 10_000_000 / 90 # distance equator to pole is 10^7 m
89 return LonLat(orig.lon + (west_m / meter_per_degree) * math.cos(orig.lat * math.pi / 180.),
90 orig.lat + north_m / meter_per_degree)
93 def _is_in_bbox(x_y: Tuple[float, float], bbox: Tuple[float, float, float, float]) -> bool:
95 return x0 <= x_y[0] <= x1 and y0 <= x_y[1] <= y1
102 def get_name(self) -> str:
103 return self.__class__.__name__
106 def get_ele(self, lon_lat: LonLat) -> Optional[float]:
110 def is_in_bbox(self, lon_lat: LonLat) -> bool:
114 class DemRasterBase(DemBase):
115 def __init__(self, filename: str, band: int = 1, crs: str = None):
116 self.filename = filename
118 self.dataset = rasterio.open(filename)
119 self.crs = crs if crs else self.dataset.crs.to_proj4()
124 def get_ele(self, lon_lat: LonLat) -> Optional[float]:
125 x_y = transform_lon_lat(lon_lat, self.crs)
126 return get_ele_from_raster(self.dataset, x_y, self.band, self.crs)
128 def is_in_bbox(self, lon_lat: LonLat) -> bool:
129 bbox = self.dataset.bounds
130 x_y = transform_lon_lat(lon_lat, self.crs)
131 return _is_in_bbox(x_y, bbox)
134 class DemBasemap(DemRasterBase):
136 filename = '/home/philipp/daten/GeoData/dem/oesterreich_10m/dhm_lamb_10m.tif'
137 super().__init__(filename)
140 class DemSwitzerland(DemRasterBase):
142 filename = 'zip:///home/philipp/daten/GeoData/dem/schweiz_200m/dem_schweiz_200m.zip!data/DHM200.asc'
143 super().__init__(filename, crs='EPSG:21781')
146 class DemBavaria(DemRasterBase):
148 filename = 'zip:///home/philipp/daten/GeoData/dem/bayern_50m/dgm50_epsg25832.tif.zip!dgm50_epsg25832.tif'
149 super().__init__(filename)
152 class DemWmsBase(DemBase):
153 def __init__(self, url: str, wms_version: str, layer_name: str):
155 self.wms_version = wms_version
156 self.layer_name = layer_name
162 def get_wms(self) -> owslib.wms.WebMapService:
164 self.wms = owslib.wms.WebMapService(self.url, version=self.wms_version)
167 def get_ele(self, lon_lat: LonLat) -> Optional[float]:
169 return get_ele_from_wms(lon_lat, wms, self.layer_name)
171 def is_in_bbox(self, lon_lat: LonLat) -> bool:
172 bbox, crs = get_bbox_from_wms(self.get_wms(), self.layer_name)
173 x_y = transform_lon_lat(lon_lat, crs)
174 return _is_in_bbox(x_y, bbox)
177 class DemSouthTyrol(DemWmsBase):
179 url = 'http://geoservices.buergernetz.bz.it/geoserver/p_bz-elevation/ows?' \
180 'SERVICE=WMS&request=getCapabilities'
181 super().__init__(url, '1.3.0', 'DTM-2p5m')
189 # DemBavaria(), # too coarse - we should have a 25 m grid or denser - this is 50 m
190 # DemSwitzerland(), # too coarse - we should have a 25 m grid or denser - this is 200 m
194 for dem in self.dems:
197 def get_ele(self, lon_lat: LonLat) -> Tuple[Optional[float], Optional[str]]:
198 for dem in self.dems:
199 if dem.is_in_bbox(lon_lat):
200 ele = dem.get_ele(lon_lat)
202 return ele, dem.get_name()