#!/usr/bin/python3.4 """Get most accurate elevation for a given point from predefined GDAL data sources or WMS sources. Terms: * wrgeo: string with coordinates in Winterrodeln notation, like '47.275712 N 11.3456496 E' * lonlat: tuple with WGS84 referenced longitude, latitude pair in degrees. * xy: projected pair of coordinates Useful resource https://pcjericks.github.io/py-gdalogr-cookbook/ Corner Coordinates: Upper Left ( 73120.000, 247300.000) Lower Left ( 73120.000, 230670.000) Upper Right ( 85220.000, 247300.000) Lower Right ( 85220.000, 230670.000) Center ( 79170.000, 238985.000) gdaltransform -t_srs WGS84 -s_srs EPSG:31254 80000 240000 11.3866966549679 47.2939095182383 31.7259142374596 Plan: (1) Get coordinates from command line in "Winterrodeln" format (2) Transform them to x/y float values (3) Transform WGS84 x/y values to EPSG:31254 x/y value (4) Get the nearest elevation from the DEM map See: file:///usr/share/doc/libgdal-doc/ogr/osr_tutorial.html >>> import dem >>> print(dem.wrgeo_to_ele_tirol('47.275712 N 11.3456496 E')) 844 >>> print(dem.wrgeo_to_ele_vbg('47.435870 N 10.085163 E')) 1426.52 """ import re import struct import urllib.request, urllib.parse, urllib.error from urllib.parse import urlencode import xml.etree.ElementTree as et import os.path import zipfile import shutil import argparse from collections import namedtuple import osgeo.osr import osgeo.gdal import osgeo.ogr from osgeo.gdalconst import * osgeo.gdal.UseExceptions() BBox = namedtuple('BBox', ['minx', 'maxx', 'miny', 'maxy']) # bounding box. same order as returned by .getEnvelope() of osgeo.ogr geometries # common functions # ---------------- def wrgeo_to_lonlat(wrgeo): """Transfers coordinates in the format '47.175712 N 11.1456496 E' to the tuple (11.1456496, 47.175712).""" match = re.match('(\d+\.\d+) N (\d+\.\d+) E', wrgeo) y, x = match.groups() return float(x), float(y) def lonlat_to_epsg(lon, lat, epsg): """lon/lat in WGS84 to specific epsg system like 31254.""" # source srs = osgeo.osr.SpatialReference() srs.SetWellKnownGeogCS('WGS84') # dest srd = osgeo.osr.SpatialReference() srd.ImportFromEPSG(epsg) # transform ct = osgeo.osr.CoordinateTransformation(srs, srd) x, y, z = ct.TransformPoint(lon, lat) return x, y def lonlat_to_epsg31254(lon, lat): """lon/lat in WGS84 to MGI_Austria_GK_West (EPSG:31254) conversion. >>> lonlat_to_epsg31254(11.39, 47.29) (80255.74881170085, 239568.73826148175)""" return lonlat_to_epsg(lon, lat, 31254) def lonlat_to_utm32n(lon, lat): """lon/lat in WGS84 to 'UTM Zone 32 North' (EPSG:32632) conversion. >>> lonlat_to_utm32n(11.39, 47.29) (680711.5152423368, 5240161.832015872)""" return lonlat_to_epsg(lon, lat, 32632) def interpolate_bilinear(fx, fy, z00, z10, z01, z11): """Interpolates the elevation between the values z00, z01, z10 and z11. :param fx: fraction between "left" (0) and "right" (1) in x-direction :param fy: fraction between "bottom" (0) and "top" (1) in y-direction :param z00: elevation in the "left bottom" edge (where fx=0, fy=0) :param z10: elevation in the "right bottom" edge (where fx=1, fy=0) :param z01: elevation in the "left upper" edge (where fx=0, fy=1) :param z11: elevation in the "right upper" edge (where fx=1, fy=1) """ return z00*(1-fx)*(1-fy) + z10*fx*(1-fy) + z01*(1-fx)*fy + z11*fx*fy def degree_to_dms(degree): d = int(degree) r = (degree - d) * 60 m = int(r) s = (r - m) * 60 return '{:2d}d{:02d}\'{:4.2f}"'.format(d, m, s) # GDAL supported raster files # --------------------------- # http://www.gdal.org/gdal_datamodel.html says: # "Note that the pixel/line coordinates in the above are from (0.0,0.0) at the top left corner of the top left pixel # to (width_in_pixels, height_in_pixels) at the bottom right corner of the bottom right pixel. # The pixel/line location of the center of the top left pixel would therefore be (0.5,0.5)." def gdal_info(filename): """Returns similar information as the command line program gdalinfo. This can be used to assist adding a new raster based source file to this library.""" dataset = osgeo.gdal.Open(filename, GA_ReadOnly) print('raster size X: {} pixel'.format(dataset.RasterXSize)) print('raster size Y: {} pixel'.format(dataset.RasterYSize)) s_srs = osgeo.osr.SpatialReference() s_srs.ImportFromWkt(dataset.GetProjectionRef()) t_srs = osgeo.osr.SpatialReference() t_srs.ImportFromEPSG(4326) # note that gdal_info reports lon/lat coordinates with the reference system t_srs = s_srs.CloneGeogCS() # In general, this gives slightly different results than t_srs.ImportFromEPSG(4326) used here. coord_trans = osgeo.osr.CoordinateTransformation(s_srs, t_srs) gt = dataset.GetGeoTransform() # create multipoint containing the four edges in lon/lat multipoint = osgeo.ogr.Geometry(osgeo.ogr.wkbMultiPoint) for e in range(4): i = [0, dataset.RasterXSize][e % 2] # horizontal pixel j = [0, dataset.RasterYSize][e // 2] # vertical pixel x, y = osgeo.gdal.ApplyGeoTransform(gt, i, j) # map units (probably meter or something like that) lon, lat, ele = coord_trans.TransformPoint(x, y) point = osgeo.ogr.Geometry(osgeo.ogr.wkbPoint) point.AddPoint(lon, lat) multipoint.AddGeometry(point) env = multipoint.GetEnvelope() print('bounding box: east {}°, west {}°, south {}°, north {}°'.format(*env)) def gdal_xy_to_ele(x, y, filename, band=1): """Returns the (linearly interpolated) elevation for the given point in the datasource specified by its filename. :param x: horizontal value in the coordinate system specified by the datasource :param y: vertical value in the coordinate system specified by the datasource :param filename: filename of datasource :param band: band of the datasource containing the elevation data :return: Elevation as float or None Example: >>> gdal_xy_to_ele(76665, 241185, 'ibk_10m.asc') 2253.0 """ dataset = osgeo.gdal.Open(filename, GA_ReadOnly) assert dataset is not None assert band <= dataset.RasterCount band = dataset.GetRasterBand(band) gt = dataset.GetGeoTransform() # gt == (xleft, xscale, xrot, ytop, yscale, yrot) igt = osgeo.gdal.InvGeoTransform(gt)[1] pixel, line = osgeo.gdal.ApplyGeoTransform(igt, x, y) # pixel and line are float values if pixel < 0 or pixel >= dataset.RasterXSize or line < 0 or line >= dataset.RasterYSize: return None # out of bounding box # nearest pixel (spixel, sline) left & above of the position specified by (pixel, line) # the 0.5 in the formula is because the center of pixels is in the middle of the pixels. spixel, sline = int(pixel-0.5), int(line-0.5) ele_str = band.ReadRaster(spixel, sline, 2, 2) raster_format = {GDT_Int32: '=4i', GDT_Float32: '=4f'}[band.DataType] ele_edges = struct.unpack(raster_format, ele_str) # [ele[spixel, sline], ele[spixel+1, sline], ele[spixel, sline+1], ele[spixel+1, sline+2]] if band.GetNoDataValue() in ele_edges: return None return interpolate_bilinear(pixel-0.5-spixel, line-0.5-sline, *ele_edges) # WMS # --- # The WMS 1.3 specification says about bounding boxes: # The relation of the Bounding Box to the map pixel matrix is that the bounding box goes around the “outside” # of the pixels of the map rather than through the centres of the map’s border pixels. # In this context, individual pixels represent an area on the ground. def wms_get_capabilities(url): # create URL # url = 'http://vogis.cnv.at/mapserver/mapserv?map=i_hoehen_und_gelaende_r_wms.map&' us = urllib.parse.urlsplit(url) query = urllib.parse.parse_qs(us.query) query.update({'VERSION': '1.0.3', 'REQUEST': 'GetCapabilities', 'SERVICE': 'WMS'}) gc_url = urllib.parse.SplitResult(us.scheme, us.netloc, us.path, urllib.parse.urlencode(query, doseq=True), us.fragment).geturl() # request f = urllib.request.urlopen(gc_url) xml = f.read() f.close() # parse response XML wms_cap = et.fromstring(xml) return wms_cap def wms_xy_to_qbbox(x, y, bbox, width, height): """Returns the bounding box of the 4 pixel surrounding the given coordinates.""" x_idx = width * (x - bbox.minx) / (bbox.maxx - bbox.minx) - 0.5 # index of pixel of total map image in x direction (float) y_idx = height * (y - bbox.miny) / (bbox.maxy - bbox.miny) - 0.5 # index of pixel of total map image in y direction (float) x0_idx = int(x_idx) x1_idx = x0_idx + 2 y0_idx = int(y_idx) y1_idx = y0_idx + 2 qbbox = BBox(minx=x0_idx * (bbox.maxx - bbox.minx) / width + bbox.minx, maxx=x1_idx * (bbox.maxx - bbox.minx) / width + bbox.minx, miny=y0_idx * (bbox.maxy - bbox.miny) / height + bbox.miny, maxy=y1_idx * (bbox.maxy - bbox.miny) / height + bbox.miny) return qbbox def wms_ij_to_ele(url, layer, crs, bbox, width, height, i, j): """Returns the elevation via a getfeatureinfo request of the pixels specified by i and j within the range specified by bbox, width and height.""" us = urllib.parse.urlsplit(url) query = urllib.parse.parse_qs(us.query) query.update({'VERSION': '1.3.0', 'REQUEST': 'GetFeatureInfo', 'LAYERS': layer, 'STYLES': '', 'CRS': crs, 'BBOX': ','.join(map(str, [bbox.minx, bbox.miny, bbox.maxx, bbox.maxy])), 'WIDTH': str(width), 'HEIGHT': str(height), 'FORMAT': 'image/png', 'QUERY_LAYERS': layer, 'INFO_FORMAT': 'application/vnd.ogc.gml', 'I': str(i), 'J': str(j)}) fi_url = urllib.parse.SplitResult(us.scheme, us.netloc, us.path, urllib.parse.urlencode(query, doseq=True), us.fragment).geturl() # request f = urllib.request.urlopen(fi_url) xml = f.read() f.close() # parse XML root = et.fromstring(xml) return float(root.find('.//value_0').text) def wms_xy_to_ele(x, y, url, layer, crs, bbox, width, height): """Returns the (linearly interpolated) elevation for the given point from the WMS. :param x: horizontal value in the native coordinate system of used layer of the WMS service :param y: vertical value in the native coordinate system of used layer of the WMS service :param url: WMS URL :param layer: WMS layer name :param crs: native coordinate reference system, e.g. 'EPSG:31254' :param bbox: layer bounding box in native coordinate system of used layer of the WMS service (instance of BBox) :param width: number of "pixel" in bbox in x direction :param height: number of "pixel" in bbox in y direction :return: Elevation as float or None Example: >>> wms_xy_to_ele(80000.0, 240000.0, url) 1129. """ # crs=EPSG:31254&dpiMode=7&format=image/png&layers=hoehenmodell_2011_gelaende_50cm&styles=&url=http://vogis.cnv.at/mapserver/mapserv?map%3Di_hoehen_und_gelaende_r_wms.map%26version%3D1.3.0%26 # find qbbox (query bounding box) containing 4 pixels surrounding x/y and centers of 4 surrounding pixels. # qbbox = wms_xy_to_qbbox(x, y, bbox, width, height) ele_list = [] for i in range(2): for j in range(1, -1, -1): ele = wms_ij_to_ele( 'http://vogis.cnv.at/mapserver/mapserv?map=i_hoehen_und_gelaende_r_wms.map', 'hoehenmodell_2011_gelaende_50cm', crs, qbbox, 2, 2, i, j) if ele is None: return None ele_list.append(ele) cbbox = BBox( minx = qbbox.minx + (qbbox.maxx-qbbox.minx)/4, maxx = qbbox.minx + (qbbox.maxx-qbbox.minx)*3/4, miny = qbbox.miny + (qbbox.maxy-qbbox.miny)/4, maxy = qbbox.miny + (qbbox.maxy-qbbox.miny)*3/4) # this bbox has the edges at the pixel centers return interpolate_bilinear((x-cbbox.minx)/(cbbox.maxx-cbbox.minx), (y-cbbox.miny)/(cbbox.maxy-cbbox.miny), *ele_list) # Map sources # ----------- class MapSource: region = None # Human readable string shortly describing the covered region source = None # Short description of data provider (string) attribution = None # attribution string resolution_m = None # (mean) distance between two samples in meter (float) bbox = None # bounding box of data expressed in longitude/latitude (BBox) epsg = None # native EPSG number, e.g. 31287 for 'EPSG:31287' class MapSourceFile(MapSource): filename = None # filename including path of map data class MapSourceWmf(MapSource): url = None # url of WMS service layer = None # WMS layer Name # Austria # ------- class MapSourceAustriaBasemap(MapSourceFile): region = 'Austria' source = 'basemap.at' resolution_m = 10. bbox = BBox(9.314479, 17.358544, 46.297834, 49.022520) filename = '/daten/GeoData/dem/oesterreich_10m/dhm_lamb_10m.tif' epsg = 31287 def lonlat_to_ele_austria(lon, lat): """DEM Austria 10x10m EPSG:31287 C-BY-3.0: Land Kaernten - data.ktn.gv.at""" filename = '/daten/GeoData/dem/oesterreich_10m/dhm_lamb_10m.tif' x, y = lonlat_to_epsg(lon, lat, 31287) return gdal_xy_to_ele(x, y, filename) # Vorarlberg # ---------- class MapSourceWmfVorarlberg(MapSourceWmf): region = 'Vorarlberg' source = 'Landesamt fuer Vermessung und Geoinformation Vorarlberg' resolution_m = 0.5 bbox = BBox(9.33463, 10.333, 46.7549, 47.6586) url = 'http://vogis.cnv.at/mapserver/mapserv?map=i_hoehen_und_gelaende_r_wms.map&' layer = 'hoehenmodell_2011_oberflaeche_50cm' epsg = 31254 def wms_getfeatureinfo_vbg(x, y): """Uses a WMS GetFeatureInfo call to retrieve the DEM data. >>> print wms_getfeatureinfo_vbg(9.783502, 47.335013) """ # Note: a better (or at least alternative) way to implement this would be to use GDAL to do it for us. # # http://www.gdal.org/frmt_wms.html # "Starting with GDAL >= 2.0, WMS layers can be queried (through a GetFeatureInfo request) with the # gdallocationinfo utility, or with a GetMetadataItem("Pixel_iCol_iLine", "LocationInfo") call on a band object. # # # This would mean: # filename='dem_vogis.xml' # dataset = osgeo.gdal.Open(filename, GA_ReadOnly) # assert dataset is not None # assert band <= dataset.RasterCount # band = dataset.GetRasterBand(1) # band.GetMetadataItem("GeoPixel_{}_{}".format(1000, 1000), "LocationInfo") # # No query is executed and None is returned. # # Using gdallocationinfo just returns the RGB values 255. # The third alternative also just returns the RGB value: # print dem.gdal_xy_to_ele(-100, 200000, 'dem_vogis.xml', 1) # 255 delta = 0.000001 xl = str(x-delta) xr = str(x+delta) yb = str(y-delta) yt = str(y+delta) parameter = [ ('map', 'i_hoehen_und_gelaende_r_wms.map'), ('SERVICE', 'WMS'), ('VERSION', '1.1.1'), ('REQUEST', 'GetFeatureInfo'), ('BBOX', ','.join([xl, yb, xr, yt])), ('SRS', 'EPSG:4326'), ('WIDTH', '3'), ('HEIGHT', '3'), ('LAYERS', 'hoehenmodell_terrain_1m'), ('QUERY_LAYERS', 'hoehenmodell_terrain_1m'), ('INFO_FORMAT', 'gml'), ('X', '1'), ('Y', '1')] url = 'http://vogis.cnv.at/mapserver/mapserv?' + urllib.parse.urlencode(parameter) f = urllib.request.urlopen(url) xml = f.read() f.close() root = et.fromstring(xml) elevation = float(root.find('hoehenmodell_terrain_1m_layer/hoehenmodell_terrain_1m_feature/value_0').text) return elevation def wrgeo_to_ele_vbg(wrgeo): """Example: >>> wrgeo_to_ele_vbg('47.435870 N 10.085163 E') 1426.52 """ x, y = wrgeo_to_lonlat(wrgeo) return wms_getfeatureinfo_vbg(x, y) # Suedtirol # --------- # http://geoservices.buergernetz.bz.it/geoserver/p_bz-elevation/ows?SERVICE=WMS&VERSION=1.3.0&REQUEST=GetFeatureInfo&BBOX=2922671.81412236206233501,1209060.08528055297210813,2926426.12536783283576369,1212054.986808244837448&CRS=EPSG:2056&WIDTH=1142&HEIGHT=911&LAYERS=DTM-2p5m&STYLES=&FORMAT=image/png&QUERY_LAYERS=DTM-2p5m&INFO_FORMAT=application/json&I=79&J=30 def utm32n_to_mapcode(utm_x, utm_y): """The mapcode is the filename of the map part without extension. >>> utm32n_to_mapcode(705033, 5175081) 'dtm_utm10161' """ # East # 604998.750 # left edge of 09013 # 608201.250 # right edge of 09013 # 608198.750 # left edge of 09012 # 611401.250 # right edge of 09012 # 611398.750 # left edge of 09023 # 614601.250 # right edge of 09023 east_i = int((utm_x-605000) // 3200 + 2) east_major = east_i // 2 east_minor = east_i % 2 # North # 5128401.250 # upper edge of 02113 # 5125598.750 # lower edge of 02113 # 5125601.250 # upper edge of 01114 # 5122798.750 # lower edge of 01114 # 5122801.250 # upper edge of 01113 # 5119998.750 # lower edge of 01113 north_i = int((utm_y-5120000) // 2800 + 2) north_major = north_i // 2 north_minor = north_i % 2 minor = {(1, 1): 1, (1, 0): 2, (0, 0): 3, (0, 1): 4}[(east_minor, north_minor)] return 'dtm_utm{:02d}{:02d}{:d}'.format(north_major, east_major, minor) def wrgeo_to_ele_stirol(wrgeo): lon, lat = wrgeo_to_lonlat(wrgeo) utm_x, utm_y = lonlat_to_utm32n(lon, lat) mapcode = utm32n_to_mapcode(utm_x, utm_y) # Unpack zip archive in /tmp if not os.path.exists(os.path.join('/tmp', mapcode + '.ascii')): shutil.copy(os.path.join('/daten/GeoData/dem/suedtirol', mapcode + '.zip'), '/tmp') zip = zipfile.ZipFile(os.path.join('/tmp', mapcode + '.zip'), 'r') zip.extractall('/tmp') # Get elevation return gdal_xy_to_ele(utm_x, utm_y, os.path.join('/tmp', mapcode + '.ascii')) # main program # ------------ def main(wrgeo, no_wms): lon, lat = wrgeo_to_lonlat(wrgeo) ele = lonlat_to_ele_austria(lon, lat) ele_str = str(ele) if ele is None else '{:.0f} m'.format(ele) print('lat: {:.6f}° lon: {:0.6f}° ele: {}'.format(lat, lon, ele_str)) if __name__ == '__main__': parser = argparse.ArgumentParser('Get most accurate elevation for a given point from predefined GDAL data sources or WMS sources') parser.add_argument('--no-wms', help='don\'t use WMF services') parser.add_argument('wrgeo', help='coordinate like "47.275712 N 11.3456496 E"') args = parser.parse_args() main(args.wrgeo, args.no_wms)