"""Contains functions that maintain/update the cache tables.""" import urllib.parse import urllib.request import json import mwparserfromhell import sqlalchemy.engine from sqlalchemy import schema, Table from sqlalchemy.engine import Connection from sqlalchemy.sql import select from sqlalchemy.sql.expression import func as sqlfunc from osgeo import ogr from wrpylib import mwdb, wrmwdb, wrmwmarkup, wrvalidators class UpdateCacheError(RuntimeError): pass def _get_mw_text(connection: Connection, text: Table, content_address: str) -> str: parts = content_address.split(':') # e.g. 'tt:15664' if len(parts) != 2 or parts[0] != 'tt': raise ValueError('Content has unexpected format') old_id = int(parts[1]) query = select([text], text.c.old_id == old_id) text_row = connection.execute(query).fetchone() return text_row.old_text def update_wrsledruncache(connection): """Updates the wrsledruncache table from the wiki. If convert errors occur, an UpdateCacheError exception is raised. No other exception type should be raised under normal circumstances. >>> from sqlalchemy.engine import create_engine >>> engine = create_engine('mysql://philipp@localhost:3306/philipp_winterrodeln_wiki?charset=utf8mb4') >>> update_wrsledruncache(engine.connect()) """ metadata = schema.MetaData() wrsledruncache = wrmwdb.wrsledruncache_table(metadata) page = mwdb.page_table(metadata) categorylinks = mwdb.categorylinks_table(metadata) slots = mwdb.slots_table(metadata) content = mwdb.content_table(metadata) text = mwdb.text_table(metadata) class Sledrun: pass try: with connection.begin(): # Query all sled runs q = select( [page, categorylinks, slots, content], (page.c.page_latest == slots.c.slot_revision_id) & (slots.c.slot_content_id == content.c.content_id) & (categorylinks.c.cl_from == page.c.page_id) & (categorylinks.c.cl_to == 'Rodelbahn')) sledrun_pages = connection.execute(q) # Delete all existing entries in wrsledruncache # We rely on transactions MySQL InnoDB connection.execute(wrsledruncache.delete()) # Refill wrsledruncache table for sledrun_page in sledrun_pages: old_text = _get_mw_text(connection, text, sledrun_page.content_address) rodelbahnbox = wrvalidators.rodelbahnbox_from_str(old_text) sledrun = wrmwmarkup.sledrun_from_rodelbahnbox(rodelbahnbox, Sledrun()) sledrun.page_id = sledrun_page.page_id sledrun.page_title = sledrun_page.page_title sledrun.name_url = wrvalidators.sledrun_page_title_to_pretty_url(sledrun_page.page_title) sledrun.under_construction = connection.execute(select( [sqlfunc.count()], (categorylinks.c.cl_from == sledrun_page.page_id) & (categorylinks.c.cl_to == 'In_Arbeit')).alias('x')).fetchone()[0] > 0 connection.execute(wrsledruncache.insert(sledrun.__dict__)) except ValueError as e: error_msg = f"Error at sled run '{sledrun_page.page_title}': {e}" raise UpdateCacheError(error_msg, sledrun_page.page_title, e) def update_wrinncache(connection): """Updates the wrinncache table from the wiki. If convert errors occur, an UpdateCacheError exception is raised. No other exception type should be raised under normal circumstances. >>> from sqlalchemy.engine import create_engine >>> engine = create_engine('mysql://philipp@localhost:3306/philipp_winterrodeln_wiki?charset=utf8mb4') >>> update_wrinncache(engine.connect()) """ metadata = schema.MetaData() wrinncache = wrmwdb.wrinncache_table(metadata) page = mwdb.page_table(metadata) categorylinks = mwdb.categorylinks_table(metadata) slots = mwdb.slots_table(metadata) content = mwdb.content_table(metadata) text = mwdb.text_table(metadata) class Inn: pass try: with connection.begin(): # Query all inns q = select( [page, categorylinks, slots, content], (page.c.page_latest == slots.c.slot_revision_id) & (slots.c.slot_content_id == content.c.content_id) & (categorylinks.c.cl_from == page.c.page_id) & (categorylinks.c.cl_to == 'Gasthaus')) inn_pages = connection.execute(q) # Delete all existing entries in wrinncache # We rely on transactions MySQL InnoDB connection.execute(wrinncache.delete()) # Refill wrinncache table for inn_page in inn_pages: old_text = _get_mw_text(connection, text, inn_page.content_address) gasthausbox = wrvalidators.gasthausbox_from_str(old_text) inn = wrmwmarkup.inn_from_gasthausbox(gasthausbox, Inn()) inn.page_id = inn_page.page_id inn.page_title = inn_page.page_title inn.under_construction = connection.execute(select( [sqlfunc.count()], (categorylinks.c.cl_from == inn_page.page_id) & (categorylinks.c.cl_to == 'In_Arbeit')).alias('x')) \ .fetchone()[0] > 0 # it would be better to do this in the query above connection.execute(wrinncache.insert(inn.__dict__)) except ValueError as e: error_msg = f"Error as inn '{inn_page.page_title}': {e}" raise UpdateCacheError(error_msg, inn_page.page_title, e) def update_wrreportcache(connection, page_id=None): """Updates the wrreportcache table. :param connection: sqlalchemy connection :param page_id: Updates only the reportcache table for the sledrun described on the Winterrodeln wiki page with the specified page_id. Use None for this parameter to update the whole table. >>> from sqlalchemy.engine import create_engine >>> engine = create_engine('mysql://philipp@localhost:3306/philipp_winterrodeln_wiki?charset=utf8mb4') >>> update_wrreportcache(engine.connect()) """ metadata = schema.MetaData() wrreportcache = wrmwdb.wrreportcache_table(metadata) with connection.begin(): # Delete the datasets we are going to update sql_del = wrreportcache.delete() if page_id is not None: sql_del = sql_del.where(wrreportcache.c.page_id == page_id) connection.execute(sql_del) def insert_row(connection_, row_list_): if len(row_list_) == 0: return # Insert the report row_ = dict(row_list_[0]) connection_.execute(wrreportcache.insert(values=row_)) # Select the rows to update sql = 'select page_id, page_title, wrreport.id as report_id, date_report, `condition`, description, ' \ 'author_name, ' \ 'if(author_userid is null, null, author_username) as author_username from wrreport ' \ 'where {0}`condition` is not null and date_invalid > now() and delete_date is null ' \ 'order by page_id, date_report desc, date_entry desc' \ .format('' if page_id is None else f'page_id={page_id} and ') cursor = connection.execute(sql) page_id = None row_list = [] for row in cursor: if row.page_id != page_id: insert_row(connection, row_list) page_id = row.page_id row_list = [] row_list.append(row) insert_row(connection, row_list) def update_wrmapcache(connection): """Updates the wrmappointcache and wrmappathcache tables from the wiki. If convert errors occur, an UpdateCacheError exception is raised. No other exception type should be raised under normal circumstances. >>> from sqlalchemy.engine import create_engine >>> engine = create_engine('mysql://philipp@localhost:3306/philipp_winterrodeln_wiki?charset=utf8mb4') >>> # or: >>> # engine = create_engine('mysql://philipp@localhost:3306/philipp_winterrodeln_wiki?charset=utf8mb4&passwd=XXX') >>> update_wrmapcache(engine.connect()) """ metadata = schema.MetaData() page = mwdb.page_table(metadata) categorylinks = mwdb.categorylinks_table(metadata) slots = mwdb.slots_table(metadata) content = mwdb.content_table(metadata) text = mwdb.text_table(metadata) try: with connection.begin(): # Query all sledruns q = select( [page, categorylinks, slots, content], (page.c.page_latest == slots.c.slot_revision_id) & (slots.c.slot_content_id == content.c.content_id) & (categorylinks.c.cl_from == page.c.page_id) & (categorylinks.c.cl_to == 'Rodelbahn')) sledrun_pages = connection.execute(q) # Delete all existing entries in wrmappointcache # We rely on transactions MySQL InnoDB connection.execute('delete from wrmappointcache') connection.execute('delete from wrmappathcache') # Refill wrmappointcache and wrmappathcache tables for sledrun_page in sledrun_pages: old_text = _get_mw_text(connection, text, sledrun_page.content_address) wikicode = mwparserfromhell.parse(old_text) wrmap_list = wikicode.filter_tags(recursive=False, matches=lambda tag: tag.tag == 'wrmap') if len(wrmap_list) == 0: continue # not wrmap in page if len(wrmap_list) > 1: raise UpdateCacheError( f'{len(wrmap_list)} entries found in article "{sledrun_page.page_title}"') wrmap = wrmap_list[0] geojson = wrmwmarkup.parse_wrmap(str(wrmap)) for feature in geojson['features']: properties = feature['properties'] coordinates = feature['geometry']['coordinates'] # Points if properties['type'] in wrmwmarkup.WRMAP_POINT_TYPES: lon, lat = coordinates label = properties.get('name') point_types = { 'gasthaus': 'hut', 'haltestelle': 'busstop', 'parkplatz': 'carpark', 'achtung': 'warning', 'foto': 'photo', 'verleih': 'rental', 'punkt': 'point' } point_type = point_types[properties['type']] sql = 'insert into wrmappointcache (page_id, type, point, label) values (%s, %s, POINT(%s, %s), %s)' connection.execute(sql, (sledrun_page.page_id, point_type, lon, lat, label)) # Paths elif properties['type'] in wrmwmarkup.WRMAP_LINE_TYPES: path_types = { 'rodelbahn': 'sledrun', 'gehweg': 'walkup', 'alternative': 'alternative', 'lift': 'lift', 'anfahrt': 'recommendedcarroute', 'linie': 'line'} path_type = path_types[properties['type']] path = ", ".join([f"{lon} {lat}" for lon, lat in coordinates]) path = f'LineString({path})' if path_type == 'recommendedcarroute': continue sql = 'insert into wrmappathcache (path, page_id, type) values (GeomFromText(%s), %s, %s)' connection.execute(sql, (path, sledrun_page.page_id, path_type)) else: raise RuntimeError(f'Unknown feature type {properties["type"]}') except RuntimeError as e: error_msg = f"Error at sledrun '{sledrun_page.page_title}': {e}" raise UpdateCacheError(error_msg, sledrun_page.page_title, e) def update_wrregioncache(connection): """Updates the wrregioncache table from the wiki. It relays on the table wrsledruncache to be up-to-date. No exceptions should be raised under normal circumstances. >>> from sqlalchemy.engine import create_engine >>> engine = create_engine('mysql://philipp@localhost:3306/philipp_winterrodeln_wiki?charset=utf8mb4') >>> # or: >>> # engine = create_engine('mysql://philipp@localhost:3306/philipp_winterrodeln_wiki?charset=utf8mb4&passwd=XXX') >>> update_wrregioncache(engine.connect()) """ metadata = schema.MetaData() wrregion = wrmwdb.wrregion_table(metadata) wrsledruncache = wrmwdb.wrsledruncache_table(metadata) wrregioncache = wrmwdb.wrregioncache_table(metadata) with connection.begin(): # Delete all existing entries in wrregioncache # We rely on transactions MySQL InnoDB connection.execute(wrregioncache.delete()) # Query all combinations of sledruns and regions sel = select( [ wrregion.c.id.label('region_id'), sqlfunc.AsWKB(wrregion.c.border).label('border'), wrsledruncache.c.page_id, wrsledruncache.c.position_longitude, wrsledruncache.c.position_latitude ], sqlfunc.contains( wrregion.c.border, sqlfunc.point(wrsledruncache.c.position_longitude, wrsledruncache.c.position_latitude) ) ) ins = wrregioncache.insert() # Refill wrregioncache point = ogr.Geometry(ogr.wkbPoint) result = connection.execute(sel) for row in result: point.SetPoint(0, row.position_longitude, row.position_latitude) if point.Within(ogr.CreateGeometryFromWkb(row.border)): connection.execute(ins.values(region_id=row.region_id, page_id=row.page_id)) def update_wrsledrunjsoncache(api_url: urllib.parse.ParseResult, connection: sqlalchemy.engine.Connection): """Updates the wrsledrunjsoncache from JSON files in the wiki. :param api_url: URL to the API, e.g. https://www.winterrodeln.org/mediawiki/api.php :param connection: open sqlalchemy connection """ with connection.begin(): # connection.execute('truncate table wrsledrunjsoncache') # needs additional permissions connection.execute('delete from wrsledrunjsoncache') cmcontinue = '' while True: request = api_url._replace(query='action=query&list=categorymembers&cmtitle=Kategorie:Rodelbahn' f'&cmlimit=50&format=json&cmcontinue={cmcontinue}') # cmlimit=max (5000) response = urllib.request.urlopen(request.geturl()) sledrun_list_response = json.load(response) rows_to_insert = sledrun_list_response['query']['categorymembers'] for row in rows_to_insert: row['sledrun_title'] = f'{row["title"]}/Rodelbahn.json' row['map_title'] = f'{row["title"]}/Landkarte.json' qs_titles = urllib.parse.urlencode({"titles": '|'.join(r['sledrun_title'] for r in rows_to_insert)}) request = api_url._replace(query='action=query&prop=revisions&rvprop=content&format=json&rvslots=main&' + qs_titles) response = urllib.request.urlopen(request.geturl()) sledrun_json_list_response = json.load(response) sledrun_json_dict = {s['title']: s for s in sledrun_json_list_response['query']['pages'].values()} qs_titles = urllib.parse.urlencode({"titles": '|'.join(r['map_title'] for r in rows_to_insert)}) request = api_url._replace(query='action=query&prop=revisions&rvprop=content&format=json&rvslots=main&' + qs_titles) response = urllib.request.urlopen(request.geturl()) map_json_list_response = json.load(response) map_json_dict = {m['title']: m for m in map_json_list_response['query']['pages'].values()} for row in rows_to_insert: sledrun_json = sledrun_json_dict.get(row['sledrun_title']) if 'missing' not in sledrun_json: row['sledrun_pageid'] = sledrun_json['pageid'] row['sledrun'] = sledrun_json['revisions'][0]['slots']['main']['*'] map_json = map_json_dict.get(row['map_title']) if 'missing' not in map_json: row['map_pageid'] = map_json['pageid'] row['map'] = map_json['revisions'][0]['slots']['main']['*'] sql = 'insert into wrsledrunjsoncache ' \ '(sledrun_page_id, sledrun_json_page_id, sledrun_json, map_json_page_id, map_json) ' \ 'values (%s, %s, %s, %s, %s)' for row in rows_to_insert: connection.execute(sql, (row['pageid'], row.get('sledrun_pageid'), row.get('sledrun'), row.get('map_pageid'), row.get('map'))) if 'continue' not in sledrun_list_response: break cmcontinue = sledrun_list_response['continue']['cmcontinue']