#!/usr/bin/python3 import argparse import configparser import json import sys from typing import List, Tuple, Iterable, NamedTuple from xml.etree.ElementTree import ElementTree import geojson import shapely import sqlalchemy from geojson import Feature, GeoJSON from shapely.geometry import shape from sqlalchemy import create_engine from termcolor import cprint from wrpylib.cli_tools import unified_diff, input_yes_no_quit, Choice from wrpylib.json_tools import format_json from wrpylib.mwapi import WikiSite from wrpylib.wrgeojson import join_wrgeojson_ways, WrLineStringFeatureType, get_geometry_center, \ LanToToMetricTransformer, DEFAULT_MAX_DIST_ERROR_M, get_geometry_envelope, get_geometry_points from wrpylib.wrosm import find_sledrun_relations, convert_osm_to_geojson, DeRefError, tags def filter_way(feature: dict, way_type: str) -> bool: return feature['geometry']['type'] == 'LineString' and feature.get('properties', {}).get('type') == way_type def feature_distance(a: Feature, b: Feature) -> float: return shape(a['geometry']).hausdorff_distance(shape(b['geometry'])) def geojson_distance(a: GeoJSON, b: GeoJSON) -> float: return get_geometry_points(a).hausdorff_distance(get_geometry_points(b)) def geojson_way_sort_key(feature: Feature, reference_feature: Feature) -> Tuple[str, float]: return feature.get('properties', {}).get('type', ''), feature_distance(feature, reference_feature) def get_db_engine(config: configparser.ConfigParser) -> sqlalchemy.engine.Engine: host = config.get('mysql', 'host') dbname = config.get('mysql', 'dbname') user = config.get('mysql', 'user_name') passwd = config.get('mysql', 'user_pass') return create_engine(f'mysql://{user}@{host}:3306/{dbname}?passwd={passwd}&charset=utf8mb4') class SimpleSledrunMap(NamedTuple): map_title: str map_json: GeoJSON map_bbox: shapely.geometry.Polygon def get_simplified_wrgeojson_from_db(connection: sqlalchemy.engine.Connection) -> Iterable[SimpleSledrunMap]: cursor = connection.execute("select page_title, map_json from wrsledrunjsoncache " "left join page on page_id = map_json_page_id " "where json_extract(map_json, '$.wr_properties.simplified') order by page_title") for row in cursor: map_title = row['page_title'].decode('utf-8') wrgeojson = geojson.loads(row['map_json']) map_bbox = get_geometry_envelope(wrgeojson) yield SimpleSledrunMap(map_title, wrgeojson, map_bbox) class OsmWrgeojson(NamedTuple): name: str bbox: shapely.geometry.Polygon wrgeojson: GeoJSON def get_osm_wrgeojson_from_xml(osm_tree: ElementTree) -> Iterable[OsmWrgeojson]: for osm_sledrun in find_sledrun_relations(osm_tree): name = tags(osm_sledrun).get('name') print(f'Calculate bbox of {name}') try: wrgeojson = convert_osm_to_geojson(osm_tree, osm_sledrun) join_wrgeojson_ways(wrgeojson) map_bbox = get_geometry_envelope(wrgeojson) yield OsmWrgeojson(name, map_bbox, wrgeojson) except DeRefError: print('Error: Incomplete XML - please load larger region.') except ValueError as e: print(e) def intersecting_wrgeojson(bbox: shapely.geometry.Polygon, candidates: Iterable[OsmWrgeojson]) \ -> Iterable[OsmWrgeojson]: for osm_wrgeojson in candidates: if bbox.intersects(osm_wrgeojson.bbox): yield osm_wrgeojson def update_sledrun_wrgeojson_to_highres(simple_sledrun_map: SimpleSledrunMap, osm_sledrun_map: OsmWrgeojson, max_dist_m: float, site: WikiSite) -> bool: wrgeojson_page = site.query_page(simple_sledrun_map.map_title) wrgeojson_page_content = wrgeojson_page['revisions'][0]['slots']['main']['content'] wr_wrgeojson = geojson.loads(wrgeojson_page_content) osm_wrgeojson = osm_sledrun_map.wrgeojson center = get_geometry_center(wr_wrgeojson) transformer = LanToToMetricTransformer(center.coordinates[1]) for way_type in WrLineStringFeatureType: wr_ways = [f for f in wr_wrgeojson['features'] if filter_way(f, way_type.value)] osm_ways = [f for f in osm_wrgeojson['features'] if filter_way(f, way_type.value)] if len(wr_ways) != len(osm_ways): cprint(f'{simple_sledrun_map.map_title} does not have the same number of {way_type.value} features', 'red') return False for reference_feature in wr_wrgeojson['features']: reference_feature_type: str = reference_feature.get('properties', {}).get('type', '') if reference_feature_type not in [v.value for v in WrLineStringFeatureType]: continue osm_features: List[Feature] = [f for f in osm_wrgeojson['features'] if filter_way(f, reference_feature_type)] osm_features = sorted(osm_features, key=lambda f: geojson_way_sort_key(f, reference_feature)) osm_feature = osm_features[0] osm_feature_m = transformer.geojson_lon_lat_to_metric(osm_feature) reference_feature_m = transformer.geojson_lon_lat_to_metric(reference_feature) assert isinstance(osm_feature_m, Feature) assert isinstance(reference_feature_m, Feature) dist_m = feature_distance(osm_feature_m, reference_feature_m) if dist_m > max_dist_m: cprint(f'Distance of ways "{reference_feature_type}" too big ({dist_m} m > {max_dist_m}).', 'red') return False reference_feature['geometry'] = osm_feature['geometry'] wr_wrgeojson_old = json.loads(wrgeojson_page_content) if wr_wrgeojson_old == wr_wrgeojson: cprint('No changes detected', 'red') return False wr_wrgeojson['wr_properties']['simplified'] = False wr_wrgeojson_old_str = format_json(wr_wrgeojson_old) wr_wrgeojson_str = format_json(wr_wrgeojson) unified_diff(wr_wrgeojson_old_str, wr_wrgeojson_str) choice = input_yes_no_quit('Do you accept the changes [yes, no, quit]? ', None) if choice == Choice.no: return False elif choice == Choice.quit: sys.exit(0) site( 'edit', pageid=wrgeojson_page['pageid'], text=wr_wrgeojson_str, summary='Höhere Auflösung der Wege erstellt.', minor=1, bot=1, baserevid=wrgeojson_page['revisions'][0]['revid'], nocreate=1, token=site.token(), ) return True def update_wrgeojson_to_highres_batch(osm_file: str, ini_files: List[str]): config = configparser.ConfigParser() config.read(ini_files) engine = get_db_engine(config) with engine.connect() as connection: simplified_wrgeojson_from_db = list(get_simplified_wrgeojson_from_db(connection)) osm_tree = ElementTree() osm_tree.parse(osm_file) osm_wrgeojson_list = list(get_osm_wrgeojson_from_xml(osm_tree)) site = WikiSite(ini_files) max_dist = DEFAULT_MAX_DIST_ERROR_M * 1.5 for simple_sledrun_map in simplified_wrgeojson_from_db: cprint(f'Processing {simple_sledrun_map.map_title}', 'green') candidates = list(intersecting_wrgeojson(simple_sledrun_map.map_bbox, osm_wrgeojson_list)) if len(candidates) == 0: print('No candidates, skipping') else: print(f'{len(candidates)} candidate(s)') for candidate in sorted(candidates, key=lambda c: geojson_distance(c.wrgeojsonyes, simple_sledrun_map.map_json)): if update_sledrun_wrgeojson_to_highres(simple_sledrun_map, candidate, max_dist, site): break def main(): parser = argparse.ArgumentParser( description='Updates sledrun wrgeojson maps with higher resolution ways from OSM files.') parser.add_argument('osm_file', help='OSM file containing many sledruns with high resolution ways.') parser.add_argument('ini_file', nargs='+', help='inifile.ini, see: https://www.winterrodeln.org/trac/wiki/ConfigIni') args = parser.parse_args() update_wrgeojson_to_highres_batch(args.osm_file, args.ini_file) if __name__ == '__main__': main()