Better formatting and add some type hints.
authorPhilipp Spitzer <philipp@spitzer.priv.at>
Thu, 5 Aug 2021 20:39:12 +0000 (22:39 +0200)
committerPhilipp Spitzer <philipp@spitzer.priv.at>
Thu, 5 Aug 2021 20:39:12 +0000 (22:39 +0200)
wrpylib/mwdb.py
wrpylib/mwmarkup.py
wrpylib/wrmwcache.py
wrpylib/wrmwdb.py
wrpylib/wrmwmarkup.py
wrpylib/wrvalidators.py

index 6dc60ebd54f64bcca1d694583aca12c82faedbb2..d522d8685246f9f8c3d1b8dd004155d6d375214e 100644 (file)
@@ -8,20 +8,21 @@ def page_table(metadata):
     """Returns the sqlalchemy Table representing the "page" table in MediaWiki.
     :param metadata: metadata = sqlalchemy.MetaData()
     """
-    return Table("page", metadata,
-    Column("page_id", types.Integer, primary_key=True),
-    Column("page_namespace", types.Integer, nullable=False),
-    Column("page_title", types.String(255, convert_unicode='force'), nullable=False),
-    Column("page_restrictions", types.String, nullable=False), # tinyblob NOT NULL
-    Column("page_is_redirect", types.Integer, nullable=False),
-    Column("page_is_new", types.Integer, nullable=False),
-    Column("page_random", types.Float, nullable=False),
-    Column("page_touched", types.String(14, convert_unicode='force'), nullable=False),
-    Column("page_latest", types.Integer, nullable=False),
-    Column("page_len", types.Integer, nullable=False),
-    Column("page_content_model", types.String(32, convert_unicode='force')),
-    Column("page_links_updated", types.String(14, convert_unicode='force')),
-    Column("page_lang", types.String(35, convert_unicode='force')),
+    return Table(
+        "page", metadata,
+        Column("page_id", types.Integer, primary_key=True),
+        Column("page_namespace", types.Integer, nullable=False),
+        Column("page_title", types.String(255, convert_unicode='force'), nullable=False),
+        Column("page_restrictions", types.String, nullable=False),  # tinyblob NOT NULL
+        Column("page_is_redirect", types.Integer, nullable=False),
+        Column("page_is_new", types.Integer, nullable=False),
+        Column("page_random", types.Float, nullable=False),
+        Column("page_touched", types.String(14, convert_unicode='force'), nullable=False),
+        Column("page_latest", types.Integer, nullable=False),
+        Column("page_len", types.Integer, nullable=False),
+        Column("page_content_model", types.String(32, convert_unicode='force')),
+        Column("page_links_updated", types.String(14, convert_unicode='force')),
+        Column("page_lang", types.String(35, convert_unicode='force')),
     )
 
 
@@ -29,18 +30,19 @@ def revision_table(metadata):
     """Returns the sqlalchemy Table representing the "revision" table in MediaWiki.
     :param metadata: metadata = sqlalchemy.MetaData()
     """
-    return Table("revision", metadata,
-    Column("rev_id", types.Integer, nullable=False, primary_key=True),
-    Column("rev_page", types.Integer, nullable=False, primary_key=True),
-    Column("rev_text_id", types.Integer, nullable=False),
-    Column("rev_comment", types.String(convert_unicode='force'), nullable=False), # tinyblob NOT NULL
-    Column("rev_user", types.Integer, nullable=False),
-    Column("rev_user_text", types.String(255, convert_unicode='force'), nullable=False),
-    Column("rev_timestamp", types.String(14, convert_unicode='force'), nullable=False),
-    Column("rev_minor_edit", types.Integer, nullable=False),
-    Column("rev_deleted", types.Integer, nullable=False),
-    Column("rev_len", types.Integer, nullable=False),
-    Column("rev_parent_id", types.Integer, nullable=False),
+    return Table(
+        "revision", metadata,
+        Column("rev_id", types.Integer, nullable=False, primary_key=True),
+        Column("rev_page", types.Integer, nullable=False, primary_key=True),
+        Column("rev_text_id", types.Integer, nullable=False),
+        Column("rev_comment", types.String(convert_unicode='force'), nullable=False),  # tinyblob NOT NULL
+        Column("rev_user", types.Integer, nullable=False),
+        Column("rev_user_text", types.String(255, convert_unicode='force'), nullable=False),
+        Column("rev_timestamp", types.String(14, convert_unicode='force'), nullable=False),
+        Column("rev_minor_edit", types.Integer, nullable=False),
+        Column("rev_deleted", types.Integer, nullable=False),
+        Column("rev_len", types.Integer, nullable=False),
+        Column("rev_parent_id", types.Integer, nullable=False),
     )
 
 
@@ -48,10 +50,11 @@ def text_table(metadata):
     """Returns the sqlalchemy Table representing the "text" table in MediaWiki.
     :param metadata: metadata = sqlalchemy.MetaData()
     """
-    return Table("text", metadata,
-    Column("old_id", types.Integer, primary_key=True),
-    Column("old_text", types.String(convert_unicode='force')),
-    Column("old_flags", types.String(convert_unicode='force')),
+    return Table(
+        "text", metadata,
+        Column("old_id", types.Integer, primary_key=True),
+        Column("old_text", types.String(convert_unicode='force')),
+        Column("old_flags", types.String(convert_unicode='force')),
     )
 
 
@@ -59,21 +62,22 @@ def user_table(metadata):
     """Returns the sqlalchemy Table representing the "user" table in MediaWiki.
     :param metadata: metadata = sqlalchemy.MetaData()
     """
-    return Table('user', metadata,
-    Column("user_id", types.Integer, primary_key=True),
-    Column("user_name", types.String(255, convert_unicode='force'), nullable=False),
-    Column("user_real_name", types.String(255, convert_unicode='force'), nullable=False),
-    Column("user_password", types.UnicodeText, nullable=False),
-    # "user_newpassword"
-    # "user_newpass_time"
-    Column("user_email", types.String(convert_unicode='force'), nullable=False),
-    # "user_touched"
-    # "user_token"
-    # "user_email_authenticated"
-    # "user_email_token"
-    # "user_email_token_expires"
-    # "user_registration"
-    # "user_editcount"
+    return Table(
+        'user', metadata,
+        Column("user_id", types.Integer, primary_key=True),
+        Column("user_name", types.String(255, convert_unicode='force'), nullable=False),
+        Column("user_real_name", types.String(255, convert_unicode='force'), nullable=False),
+        Column("user_password", types.UnicodeText, nullable=False),
+        # "user_newpassword"
+        # "user_newpass_time"
+        Column("user_email", types.String(convert_unicode='force'), nullable=False),
+        # "user_touched"
+        # "user_token"
+        # "user_email_authenticated"
+        # "user_email_token"
+        # "user_email_token_expires"
+        # "user_registration"
+        # "user_editcount"
     )
 
 
@@ -83,7 +87,8 @@ def user_groups_table(metadata):
 
     :param metadata: metadata = sqlalchemy.MetaData()
     """
-    return Table('user_groups', metadata,
+    return Table(
+        'user_groups', metadata,
         Column('ug_user', types.Integer, nullable=False, primary_key=True),
         Column('ug_group', types.String(255, convert_unicode='force'), nullable=False, primary_key=True),
         Column("ug_expiry", types.String(14, convert_unicode='force')),
@@ -94,9 +99,10 @@ def categorylinks_table(metadata):
     """Returns the sqlalchemy Table representing the "categorylinks" table in MediaWiki.
     :param metadata: metadata = sqlalchemy.MetaData()
     """
-    return Table("categorylinks", metadata,
-    Column("cl_from", types.Integer, nullable=False, primary_key=True),
-    Column("cl_to", types.String(255, convert_unicode='force'), nullable=False, primary_key=True),
-    Column("cl_sortkey", types.String(230, convert_unicode='force'), nullable=False),
-    Column("cl_timestamp", types.DateTime, nullable=False),
+    return Table(
+        "categorylinks", metadata,
+        Column("cl_from", types.Integer, nullable=False, primary_key=True),
+        Column("cl_to", types.String(255, convert_unicode='force'), nullable=False, primary_key=True),
+        Column("cl_sortkey", types.String(230, convert_unicode='force'), nullable=False),
+        Column("cl_timestamp", types.DateTime, nullable=False),
     )
index f65a995f12eb33aaadd9d1f9a6eb1d829f29abbc..83e29ec5ad2256634989574cfcc3a7dc3acfb784 100644 (file)
@@ -6,6 +6,9 @@ Other Python MediaWiki parsers:
 * mwlib http://code.pediapress.com/wiki/wiki
 * https://www.mediawiki.org/wiki/Alternative_parsers
 """
+from typing import Optional
+
+from mwparserfromhell.nodes import Template
 
 
 class ParseError(RuntimeError):
@@ -13,8 +16,8 @@ class ParseError(RuntimeError):
     pass
 
 
-def format_template_table(template, keylen=None):
-    """Reformat the given template to be tabular.
+def format_template_table(template: Template, keylen: Optional[int] = None):
+    """Reformat the given template to be tabular. The template is modified in-place
 
     >>> template
     {{foo|bar|bazz=7}}
@@ -24,6 +27,7 @@ def format_template_table(template, keylen=None):
     | bazz = 7
     }}
 
+    :param template: MediaWiki template to be formatted
     :param keylen: length of the keys or None for automatic determination
     """
     if keylen is None:
@@ -40,9 +44,10 @@ def format_template_table(template, keylen=None):
             param.value = '\n'
 
 
-def format_template_oneline(template):
+def format_template_oneline(template: Template):
     """Formats a template like this: {{template_name|param| }}
-    (whitespace is stripped and empty parameters are replaced with one space)."""
+    (whitespace is stripped and empty parameters are replaced with one space).
+    The template is modified in-place."""
     template.name = template.name.strip()
     for param in template.params:
         if param.showkey:
@@ -53,7 +58,7 @@ def format_template_oneline(template):
         param.value = value
 
 
-def dbkey_to_title(value):
+def dbkey_to_title(value: str) -> str:
     """Converts a article database key to a article title. Private function secureAndSplit() of the Title class
     on line 3316 of includes/Title.php says:
     $this->mTextform = str_replace( '_', ' ', $this->mDbkeyform );
index 81f6b4eae5389e8cdb14db12d6458bcdde1fe40e..5cb4c1e526049d0b707d3d3b9ffbe9a2ca2980f5 100644 (file)
@@ -31,10 +31,16 @@ def update_wrsledruncache(connection):
     transaction = connection.begin()
 
     # Query all sled runs
-    q = select([page, categorylinks, revision, text], (page.c.page_latest==revision.c.rev_id) & (text.c.old_id==revision.c.rev_text_id) & (categorylinks.c.cl_from==page.c.page_id) & (categorylinks.c.cl_to=='Rodelbahn'))
+    q = select(
+        [page, categorylinks, revision, text],
+        (page.c.page_latest == revision.c.rev_id) & (text.c.old_id == revision.c.rev_text_id) &
+        (categorylinks.c.cl_from == page.c.page_id) & (categorylinks.c.cl_to == 'Rodelbahn'))
     sledrun_pages = connection.execute(q)
     # Original SQL:
-    # sql = u"select page_id, rev_id, old_id, page_title, old_text, 'In_Arbeit' in (select cl_to from categorylinks where cl_from=page_id) as under_construction from page, revision, text, categorylinks where page_latest=rev_id and old_id=rev_text_id and cl_from=page_id and cl_to='Rodelbahn' order by page_title"
+    # sql = u"select page_id, rev_id, old_id, page_title, old_text, 'In_Arbeit' in
+    # (select cl_to from categorylinks where cl_from=page_id) as under_construction
+    # from page, revision, text, categorylinks where page_latest=rev_id and old_id=rev_text_id and
+    # cl_from=page_id and cl_to='Rodelbahn' order by page_title"
     
     # Delete all existing entries in wrsledruncache
     # We rely on transactions MySQL InnoDB
@@ -48,7 +54,10 @@ def update_wrsledruncache(connection):
             sledrun.page_id = sledrun_page.page_id
             sledrun.page_title = sledrun_page.page_title
             sledrun.name_url = wrvalidators.sledrun_page_title_to_pretty_url(sledrun_page.page_title)
-            sledrun.under_construction = connection.execute(select([sqlfunc.count()], (categorylinks.c.cl_from==sledrun_page.page_id) & (categorylinks.c.cl_to == 'In_Arbeit')).alias('x')).fetchone()[0] > 0
+            sledrun.under_construction = connection.execute(select(
+                [sqlfunc.count()],
+                (categorylinks.c.cl_from == sledrun_page.page_id) &
+                (categorylinks.c.cl_to == 'In_Arbeit')).alias('x')).fetchone()[0] > 0
             connection.execute(wrsledruncache.insert(sledrun.__dict__))
         except ValueError as e:
             transaction.rollback()
@@ -78,7 +87,10 @@ def update_wrinncache(connection):
     transaction = connection.begin()
 
     # Query all inns
-    q = select([page, categorylinks, revision, text], (page.c.page_latest==revision.c.rev_id) & (text.c.old_id==revision.c.rev_text_id) & (categorylinks.c.cl_from==page.c.page_id) & (categorylinks.c.cl_to=='Gasthaus'))
+    q = select(
+        [page, categorylinks, revision, text],
+        (page.c.page_latest == revision.c.rev_id) & (text.c.old_id == revision.c.rev_text_id) &
+        (categorylinks.c.cl_from == page.c.page_id) & (categorylinks.c.cl_to == 'Gasthaus'))
     inn_pages = connection.execute(q)
         
     # Delete all existing entries in wrinncache
@@ -92,11 +104,15 @@ def update_wrinncache(connection):
             inn = wrmwmarkup.inn_from_gasthausbox(gasthausbox, Inn())
             inn.page_id = inn_page.page_id
             inn.page_title = inn_page.page_title
-            inn.under_construction = connection.execute(select([sqlfunc.count()], (categorylinks.c.cl_from==inn_page.page_id) & (categorylinks.c.cl_to == 'In_Arbeit')).alias('x')).fetchone()[0] > 0 # It would be better to do this in the query above
+            inn.under_construction = connection.execute(select(
+                [sqlfunc.count()],
+                (categorylinks.c.cl_from == inn_page.page_id) &
+                (categorylinks.c.cl_to == 'In_Arbeit')).alias('x')) \
+                .fetchone()[0] > 0  # it would be better to do this in the query above
             connection.execute(wrinncache.insert(inn.__dict__))
         except ValueError as e:
             transaction.rollback()
-            error_msg = "Error as inn '{0}': {1}".format(inn_page.page_title, str(e))
+            error_msg = f"Error as inn '{inn_page.page_title}': {e}"
             raise UpdateCacheError(error_msg, inn_page.page_title, e)
     transaction.commit()
 
@@ -117,37 +133,44 @@ def update_wrreportcache(connection, page_id=None):
 
     # Delete the datasets we are going to update
     sql_del = wrreportcache.delete()
-    if not page_id is None: sql_del = sql_del.where(wrreportcache.c.page_id == page_id)
+    if page_id is not None:
+        sql_del = sql_del.where(wrreportcache.c.page_id == page_id)
     connection.execute(sql_del)
 
-    def insert_row(connection, rowlist):
-        if len(rowlist) == 0: return
+    def insert_row(connection_, row_list_):
+        if len(row_list_) == 0:
+            return
         # Insert the report
-        row = dict(rowlist[0])
-        connection.execute(wrreportcache.insert(values=row))
+        row_ = dict(row_list_[0])
+        connection_.execute(wrreportcache.insert(values=row_))
 
     # Select the rows to update
-    sql = 'select page_id, page_title, wrreport.id as report_id, date_report, `condition`, description, author_name, if(author_userid is null, null, author_username) as author_username from wrreport where {0}`condition` is not null and date_invalid > now() and delete_date is null order by page_id, date_report desc, date_entry desc'.format('' if page_id is None else 'page_id={0} and '.format(page_id))
+    sql = 'select page_id, page_title, wrreport.id as report_id, date_report, `condition`, description, author_name, ' \
+          'if(author_userid is null, null, author_username) as author_username from wrreport ' \
+          'where {0}`condition` is not null and date_invalid > now() and delete_date is null ' \
+          'order by page_id, date_report desc, date_entry desc' \
+          .format('' if page_id is None else 'page_id={0} and '.format(page_id))
     cursor = connection.execute(sql)
     page_id = None
-    rowlist = []
+    row_list = []
     for row in cursor:
         if row.page_id != page_id:
-            insert_row(connection, rowlist)
+            insert_row(connection, row_list)
             page_id = row.page_id
-            rowlist = []
-        rowlist.append(row)
-    insert_row(connection, rowlist)
+            row_list = []
+        row_list.append(row)
+    insert_row(connection, row_list)
     transaction.commit()
 
 
 def update_wrmapcache(connection):
-    """Updates the wrmappointcache and wrmappathcache tables from the wiki. If convert errors occur, an UpdateCacheError exception
-    is raised. No other exception type should be raised under normal circumstances.
+    """Updates the wrmappointcache and wrmappathcache tables from the wiki. If convert errors occur,
+    an UpdateCacheError exception is raised. No other exception type should be raised under normal circumstances.
     
     >>> from sqlalchemy.engine import create_engine
     >>> engine = create_engine('mysql://philipp@localhost:3306/philipp_winterrodeln_wiki?charset=utf8mb4')
-    >>> # or: engine = create_engine('mysql://philipp@localhost:3306/philipp_winterrodeln_wiki?charset=utf8mb4&passwd=XXXXX')
+    >>> # or:
+    >>> # engine = create_engine('mysql://philipp@localhost:3306/philipp_winterrodeln_wiki?charset=utf8mb4&passwd=XXX')
     >>> update_wrmapcache(engine.connect())
     """
     metadata = schema.MetaData()
@@ -159,10 +182,16 @@ def update_wrmapcache(connection):
     transaction = connection.begin()
 
     # Query all sledruns
-    q = select([page, categorylinks, revision, text], (page.c.page_latest==revision.c.rev_id) & (text.c.old_id==revision.c.rev_text_id) & (categorylinks.c.cl_from==page.c.page_id) & (categorylinks.c.cl_to=='Rodelbahn'))
+    q = select(
+        [page, categorylinks, revision, text],
+        (page.c.page_latest == revision.c.rev_id) & (text.c.old_id == revision.c.rev_text_id) &
+        (categorylinks.c.cl_from == page.c.page_id) & (categorylinks.c.cl_to == 'Rodelbahn'))
     sledrun_pages = connection.execute(q)
     # Original SQL:
-    # sql = u"select page_id, rev_id, old_id, page_title, old_text, 'In_Arbeit' in (select cl_to from categorylinks where cl_from=page_id) as under_construction from page, revision, text, categorylinks where page_latest=rev_id and old_id=rev_text_id and cl_from=page_id and cl_to='Rodelbahn' order by page_title"
+    # sql = u"select page_id, rev_id, old_id, page_title, old_text, 'In_Arbeit' in
+    # (select cl_to from categorylinks where cl_from=page_id) as under_construction
+    # from page, revision, text, categorylinks where page_latest=rev_id and old_id=rev_text_id and cl_from=page_id
+    # and cl_to='Rodelbahn' order by page_title"
     
     # Delete all existing entries in wrmappointcache
     # We rely on transactions MySQL InnoDB
@@ -178,7 +207,8 @@ def update_wrmapcache(connection):
             if len(wrmap_list) == 0:
                 continue  # not wrmap in page
             if len(wrmap_list) > 1:
-                raise UpdateCacheError('{} <wrmap ...> entries found in article "{}"'.format(len(wrmap_list), sledrun_page.page_title))
+                raise UpdateCacheError(
+                    f'{len(wrmap_list)} <wrmap ...> entries found in article "{sledrun_page.page_title}"')
             wrmap = wrmap_list[0]
             geojson = wrmwmarkup.parse_wrmap(str(wrmap))
 
@@ -190,18 +220,33 @@ def update_wrmapcache(connection):
                 if properties['type'] in wrmwmarkup.WRMAP_POINT_TYPES:
                     lon, lat = coordinates
                     label = properties.get('name')
-                    point_types = {'gasthaus': 'hut', 'haltestelle': 'busstop', 'parkplatz': 'carpark', 'achtung': 'warning', 'foto': 'photo', 'verleih': 'rental', 'punkt': 'point'}
+                    point_types = {
+                        'gasthaus': 'hut',
+                        'haltestelle': 'busstop',
+                        'parkplatz': 'carpark',
+                        'achtung': 'warning',
+                        'foto': 'photo',
+                        'verleih': 'rental',
+                        'punkt': 'point'
+                    }
                     point_type = point_types[properties['type']]
                     sql = 'insert into wrmappointcache (page_id, type, point, label) values (%s, %s, POINT(%s, %s), %s)'
                     connection.execute(sql, (sledrun_page.page_id, point_type, lon, lat, label))
 
                 # Paths
                 elif properties['type'] in wrmwmarkup.WRMAP_LINE_TYPES:
-                    path_types = {'rodelbahn': 'sledrun', 'gehweg': 'walkup', 'alternative': 'alternative', 'lift': 'lift', 'anfahrt': 'recommendedcarroute', 'linie': 'line'}
+                    path_types = {
+                        'rodelbahn': 'sledrun',
+                        'gehweg': 'walkup',
+                        'alternative': 'alternative',
+                        'lift': 'lift',
+                        'anfahrt': 'recommendedcarroute',
+                        'linie': 'line'}
                     path_type = path_types[properties['type']]
                     path = ", ".join(["{0} {1}".format(lon, lat) for lon, lat in coordinates])
                     path = 'LineString({0})'.format(path)
-                    if path_type == 'recommendedcarroute': continue
+                    if path_type == 'recommendedcarroute':
+                        continue
                     sql = 'insert into wrmappathcache (path, page_id, type) values (GeomFromText(%s), %s, %s)'
                     connection.execute(sql, (path, sledrun_page.page_id, path_type))
 
@@ -221,7 +266,8 @@ def update_wrregioncache(connection):
     
     >>> from sqlalchemy.engine import create_engine
     >>> engine = create_engine('mysql://philipp@localhost:3306/philipp_winterrodeln_wiki?charset=utf8mb4')
-    >>> # or: engine = create_engine('mysql://philipp@localhost:3306/philipp_winterrodeln_wiki?charset=utf8mb4&passwd=XXXXX')
+    >>> # or:
+    >>> # engine = create_engine('mysql://philipp@localhost:3306/philipp_winterrodeln_wiki?charset=utf8mb4&passwd=XXX')
     >>> update_wrregioncache(engine.connect())
     """
     metadata = schema.MetaData()
@@ -236,7 +282,19 @@ def update_wrregioncache(connection):
     connection.execute(wrregioncache.delete())
     
     # Query all combinations of sledruns and regions
-    sel = select([wrregion.c.id.label('region_id'), sqlfunc.AsWKB(wrregion.c.border).label('border'), wrsledruncache.c.page_id, wrsledruncache.c.position_longitude, wrsledruncache.c.position_latitude], sqlfunc.contains(wrregion.c.border, sqlfunc.point(wrsledruncache.c.position_longitude, wrsledruncache.c.position_latitude)))
+    sel = select(
+        [
+            wrregion.c.id.label('region_id'),
+            sqlfunc.AsWKB(wrregion.c.border).label('border'),
+            wrsledruncache.c.page_id,
+            wrsledruncache.c.position_longitude,
+            wrsledruncache.c.position_latitude
+        ],
+        sqlfunc.contains(
+            wrregion.c.border,
+            sqlfunc.point(wrsledruncache.c.position_longitude, wrsledruncache.c.position_latitude)
+        )
+    )
     ins = wrregioncache.insert()
 
     # Refill wrregioncache
index 3e1abb44774d320ba27a8be30febbee117c42a5b..cd44b719dd87e31ae04bb1c4ff707ae3aa90df42 100644 (file)
@@ -18,27 +18,28 @@ def wrreport_table(metadata):
     * version 1.5 added time_report
     :param metadata: metadata = sqlalchemy.MetaData()
     """
-    return Table("wrreport", metadata,
-    Column("id", types.Integer, primary_key=True),
-    Column("page_id", types.Integer, schema.ForeignKey('wrsledruncache.page_id')),
-    Column("page_title", types.Unicode(255), nullable=False),
-    Column("date_report", types.Date),
-    Column("time_report", types.Time),
-    Column("date_entry", types.DateTime, nullable=False),
-    Column("date_invalid", types.DateTime),
-    Column("condition", types.Integer),
-    Column("description", types.Unicode),
-    Column("author_name", types.Unicode(30)),
-    Column("author_ip", types.Unicode(15)),
-    Column("author_userid", types.Integer),
-    Column("author_username", types.Unicode(30)),
-    Column("delete_date", types.DateTime),
-    Column("delete_person_name", types.Unicode(30)),
-    Column("delete_person_ip", types.Unicode(15)),
-    Column("delete_person_userid", types.Integer),
-    Column("delete_person_username", types.Unicode(30)),
-    Column("delete_reason_public", types.Unicode),
-    Column("delete_invisible", types.Boolean),
+    return Table(
+        "wrreport", metadata,
+        Column("id", types.Integer, primary_key=True),
+        Column("page_id", types.Integer, schema.ForeignKey('wrsledruncache.page_id')),
+        Column("page_title", types.Unicode(255), nullable=False),
+        Column("date_report", types.Date),
+        Column("time_report", types.Time),
+        Column("date_entry", types.DateTime, nullable=False),
+        Column("date_invalid", types.DateTime),
+        Column("condition", types.Integer),
+        Column("description", types.Unicode),
+        Column("author_name", types.Unicode(30)),
+        Column("author_ip", types.Unicode(15)),
+        Column("author_userid", types.Integer),
+        Column("author_username", types.Unicode(30)),
+        Column("delete_date", types.DateTime),
+        Column("delete_person_name", types.Unicode(30)),
+        Column("delete_person_ip", types.Unicode(15)),
+        Column("delete_person_userid", types.Integer),
+        Column("delete_person_username", types.Unicode(30)),
+        Column("delete_reason_public", types.Unicode),
+        Column("delete_invisible", types.Boolean),
     )
 
 
@@ -48,42 +49,43 @@ def wrsledruncache_table(metadata):
     * version 1.4 (renamed table and added column walkup_possible)
     :param metadata: metadata = sqlalchemy.MetaData()
     """
-    return Table("wrsledruncache", metadata,
-    Column("page_id", types.Integer, schema.ForeignKey('wrreportcache.page_id'), primary_key=True),
-    Column("page_title", types.Unicode(255)),
-    Column("name_url", types.Unicode(255)),
-    Column("position_latitude", types.Float),
-    Column("position_longitude", types.Float),
-    Column("top_latitude", types.Float),
-    Column("top_longitude", types.Float),
-    Column("top_elevation", types.Integer),
-    Column("bottom_latitude", types.Float),
-    Column("bottom_longitude", types.Float),
-    Column("bottom_elevation", types.Integer),
-    Column("length", types.Integer),
-    Column("difficulty", types.Integer),
-    Column("avalanches", types.Integer),
-    Column("operator", types.Unicode(255)),
-    Column("public_transport", types.Integer),
-    Column("walkup_possible", types.Boolean),
-    Column("walkup_time", types.Integer),
-    Column("walkup_separate", types.Float),
-    Column("walkup_separate_comment", types.Unicode(255)),
-    Column("lift", types.Boolean),
-    Column("lift_details", types.Unicode(255)),
-    Column("night_light", types.Float),
-    Column("night_light_comment", types.Unicode(255)),
-    Column("night_light_days", types.Integer),
-    Column("night_light_days_comment", types.Unicode(255)),
-    Column("sled_rental", types.Boolean),
-    Column("sled_rental_comment", types.Unicode(255)),
-    Column("cachet", types.Unicode(255)),
-    Column("information_web", types.Unicode(255)),
-    Column("information_phone", types.Unicode(255)),
-    Column("image", types.Unicode(255)),
-    Column("show_in_overview", types.Boolean),
-    Column("forum_id", types.Integer),
-    Column("under_construction", types.Boolean),
+    return Table(
+        "wrsledruncache", metadata,
+        Column("page_id", types.Integer, schema.ForeignKey('wrreportcache.page_id'), primary_key=True),
+        Column("page_title", types.Unicode(255)),
+        Column("name_url", types.Unicode(255)),
+        Column("position_latitude", types.Float),
+        Column("position_longitude", types.Float),
+        Column("top_latitude", types.Float),
+        Column("top_longitude", types.Float),
+        Column("top_elevation", types.Integer),
+        Column("bottom_latitude", types.Float),
+        Column("bottom_longitude", types.Float),
+        Column("bottom_elevation", types.Integer),
+        Column("length", types.Integer),
+        Column("difficulty", types.Integer),
+        Column("avalanches", types.Integer),
+        Column("operator", types.Unicode(255)),
+        Column("public_transport", types.Integer),
+        Column("walkup_possible", types.Boolean),
+        Column("walkup_time", types.Integer),
+        Column("walkup_separate", types.Float),
+        Column("walkup_separate_comment", types.Unicode(255)),
+        Column("lift", types.Boolean),
+        Column("lift_details", types.Unicode(255)),
+        Column("night_light", types.Float),
+        Column("night_light_comment", types.Unicode(255)),
+        Column("night_light_days", types.Integer),
+        Column("night_light_days_comment", types.Unicode(255)),
+        Column("sled_rental", types.Boolean),
+        Column("sled_rental_comment", types.Unicode(255)),
+        Column("cachet", types.Unicode(255)),
+        Column("information_web", types.Unicode(255)),
+        Column("information_phone", types.Unicode(255)),
+        Column("image", types.Unicode(255)),
+        Column("show_in_overview", types.Boolean),
+        Column("forum_id", types.Integer),
+        Column("under_construction", types.Boolean),
     )
 
 
@@ -94,27 +96,28 @@ def wrinncache_table(metadata):
     * version 1.4 (no changes)
     :param metadata: metadata = sqlalchemy.MetaData()
     """
-    return Table("wrinncache", metadata,
-    Column("page_id", types.Integer, primary_key=True),
-    Column("page_title", types.Unicode(255)),
-    Column("position_latitude", types.Float),
-    Column("position_longitude", types.Float),
-    Column("position_elevation", types.Integer),
-    Column("operator", types.Unicode(255)),
-    Column("seats", types.Integer),
-    Column("overnight", types.Boolean),
-    Column("overnight_comment", types.Unicode(255)),
-    Column("smoker_area", types.Boolean),
-    Column("nonsmoker_area", types.Boolean),
-    Column("sled_rental", types.Boolean),
-    Column("sled_rental_comment", types.Unicode(255)),
-    Column("mobile_provider", types.Unicode),
-    Column("homepage", types.Unicode(255)),
-    Column("email_list", types.Unicode),
-    Column("phone_list", types.Unicode),
-    Column("image", types.Unicode(255)),
-    Column("sledding_list", types.Unicode),
-    Column("under_construction", types.Boolean),
+    return Table(
+        "wrinncache", metadata,
+        Column("page_id", types.Integer, primary_key=True),
+        Column("page_title", types.Unicode(255)),
+        Column("position_latitude", types.Float),
+        Column("position_longitude", types.Float),
+        Column("position_elevation", types.Integer),
+        Column("operator", types.Unicode(255)),
+        Column("seats", types.Integer),
+        Column("overnight", types.Boolean),
+        Column("overnight_comment", types.Unicode(255)),
+        Column("smoker_area", types.Boolean),
+        Column("nonsmoker_area", types.Boolean),
+        Column("sled_rental", types.Boolean),
+        Column("sled_rental_comment", types.Unicode(255)),
+        Column("mobile_provider", types.Unicode),
+        Column("homepage", types.Unicode(255)),
+        Column("email_list", types.Unicode),
+        Column("phone_list", types.Unicode),
+        Column("image", types.Unicode(255)),
+        Column("sledding_list", types.Unicode),
+        Column("under_construction", types.Boolean),
     )
 
 
@@ -124,15 +127,16 @@ def wrreportcache_table(metadata):
     * version 1.5 (introduction)
     :param metadata: metadata = sqlalchemy.MetaData()
     """
-    return Table("wrreportcache", metadata,
-    Column("page_id", types.Integer, primary_key=True),
-    Column("page_title", types.Unicode(255), nullable=False),
-    Column("report_id", types.Integer, schema.ForeignKey('wrreport.id')),
-    Column("date_report", types.Date),
-    Column("condition", types.Integer),
-    Column("description", types.Unicode),
-    Column("author_name", types.Unicode(30)),
-    Column("author_username", types.Unicode(30))
+    return Table(
+        "wrreportcache", metadata,
+        Column("page_id", types.Integer, primary_key=True),
+        Column("page_title", types.Unicode(255), nullable=False),
+        Column("report_id", types.Integer, schema.ForeignKey('wrreport.id')),
+        Column("date_report", types.Date),
+        Column("condition", types.Integer),
+        Column("description", types.Unicode),
+        Column("author_name", types.Unicode(30)),
+        Column("author_username", types.Unicode(30))
     )
 
 
@@ -142,11 +146,12 @@ def wrregion_table(metadata):
     * version 1.5 (introduction)
     :param metadata: metadata = sqlalchemy.MetaData()
     """
-    return Table("wrregion", metadata,
-    Column("id", types.Integer, primary_key=True),
-    Column("name", types.Unicode(50)),
-    Column("page_id", types.Integer),
-    Column("border", types.LargeBinary) # MultiPolygon(2, 4326)
+    return Table(
+        "wrregion", metadata,
+        Column("id", types.Integer, primary_key=True),
+        Column("name", types.Unicode(50)),
+        Column("page_id", types.Integer),
+        Column("border", types.LargeBinary)  # MultiPolygon(2, 4326)
     )
 
 
@@ -156,10 +161,11 @@ def wrregioncache_table(metadata):
     * version 1.5 (introduction)
     :param metadata: metadata = sqlalchemy.MetaData()
     """
-    return Table("wrregioncache", metadata,
-    Column("id", types.Integer, primary_key=True),
-    Column("region_id", types.Integer, schema.ForeignKey('wrregin.id')),
-    Column("page_id", types.Integer)
+    return Table(
+        "wrregioncache", metadata,
+        Column("id", types.Integer, primary_key=True),
+        Column("region_id", types.Integer, schema.ForeignKey('wrregion.id')),
+        Column("page_id", types.Integer)
     )
 
 
@@ -169,7 +175,8 @@ def wrintermapssledrun_table(metadata):
     * version 1.5 (introduction)
     :param metadata: metadata = sqlalchemy.MetaData()
     """
-    return Table("wrintermapssledrun", metadata,
+    return Table(
+        "wrintermapssledrun", metadata,
         Column("intermaps_sledrun_id", intermaps_sledrun_id_type, primary_key=True),
         Column("intermaps_sledrun_name", types.Unicode(255)),
         Column("intermaps_region_id", types.Integer),
@@ -187,7 +194,8 @@ def wrintermapsreport_table(metadata):
     * version 1.5 (introduction)
     :param metadata: metadata = sqlalchemy.MetaData()
     """
-    return Table("wrintermapsreport", metadata,
+    return Table(
+        "wrintermapsreport", metadata,
         Column("intermaps_sledrun_id", intermaps_sledrun_id_type, primary_key=True),
         Column("status", intermaps_sledrun_status_type, nullable=False),
         Column("last_update", types.DateTime, nullable=False),
@@ -202,7 +210,8 @@ def wrintermapsreporthistory_table(metadata):
     * version 1.5 (introduction)
     :param metadata: metadata = sqlalchemy.MetaData()
     """
-    return Table("wrintermapsreporthistory", metadata,
+    return Table(
+        "wrintermapsreporthistory", metadata,
         Column("id", types.Integer, primary_key=True),
         Column("intermaps_sledrun_id", intermaps_sledrun_id_type, nullable=False),
         Column("status", intermaps_sledrun_status_type, nullable=False),
index e52d20398a06577c381a8d765f139e71d7919252..4294393b237cdd0e528ce72f99b829f54dd26d01 100644 (file)
@@ -3,7 +3,10 @@
 import re
 import xml.etree.ElementTree
 import collections
-import mwparserfromhell
+from typing import Tuple, Optional, List
+
+from mwparserfromhell.nodes import Template
+
 import wrpylib.wrvalidators
 import wrpylib.mwmarkup
 import wrpylib.wrmwdb
@@ -48,7 +51,7 @@ def sledrun_from_rodelbahnbox(value, sledrun):
     return sledrun
 
 
-def sledrun_to_rodelbahnbox(sledrun):
+def sledrun_to_rodelbahnbox(sledrun) -> collections.OrderedDict:
     """Takes a sledrun instance that might come from the database and converts it to a OrderedDict ready
     to be formatted as RodelbahnBox."""
     value = collections.OrderedDict()
@@ -83,8 +86,8 @@ def inn_from_gasthausbox(value, inn):
     value is a dict of properties as returned by gasthausbox_from_str."""
     # page_id = None # this field is not updated because it is not present in the Gasthausbox
     # page_title = None # this field is not updated because it is not present in the Gasthausbox
-    def convtodb(value, key):
-        v = GASTHAUSBOX_DICT[key].to_str(value[key])
+    def convtodb(val, key):
+        v = GASTHAUSBOX_DICT[key].to_str(val[key])
         if v == '':
             return None
         return v
@@ -106,10 +109,10 @@ def inn_from_gasthausbox(value, inn):
     return inn
 
 
-def inn_to_gasthausbox(inn):
-    """Converts an inn class to a dict of Gasthausbox properties. value is an Inn instance."""
-    def convfromdb(value, key):
-        v = '' if value is None else value
+def inn_to_gasthausbox(inn) -> collections.OrderedDict:
+    """Converts an inn class to a dict of Gasthausbox properties. inn is an Inn instance."""
+    def convfromdb(val, key):
+        v = '' if value is None else val
         return GASTHAUSBOX_DICT[key].from_str(v)
     value = collections.OrderedDict()
     value['Position'] = LonLat(inn.position_longitude, inn.position_latitude)
@@ -117,7 +120,8 @@ def inn_to_gasthausbox(inn):
     value['Betreiber'] = inn.operator
     value['Sitzplätze'] = inn.seats
     value['Übernachtung'] = (inn.overnight, inn.overnight_comment)
-    value['Rauchfrei'] = {(False, True): 0.0, (True, True): 0.5, (True, False): 1.0}.get((inn.nonsmoker_area, inn.smoker_area), None)
+    value['Rauchfrei'] = {(False, True): 0.0, (True, True): 0.5, (True, False): 1.0} \
+        .get((inn.nonsmoker_area, inn.smoker_area), None)
     value['Rodelverleih'] = (inn.sled_rental, inn.sled_rental_comment)
     value['Handyempfang'] = convfromdb(inn.mobile_provider, 'Handyempfang')
     value['Homepage'] = convfromdb(inn.homepage, 'Homepage')
@@ -128,16 +132,16 @@ def inn_to_gasthausbox(inn):
     return value
 
 
-def lonlat_ele_from_template(template):
-    """Template is a mwparserfromhell.nodes.template.Template instance. Returns (lonlat, ele)."""
+def lonlat_ele_from_template(template) -> Tuple[LonLat, Optional[int]]:
+    """Template is a `mwparserfromhell.nodes.template.Template` instance. Returns (lonlat, ele)."""
     lonlat = opt_lonlat_from_str(template.params[0].strip())
     ele = opt_uint_from_str(template.params[1].strip())
     return lonlat, ele
 
 
-def latlon_ele_to_template(lonlat_ele, name):
+def latlon_ele_to_template(lonlat_ele, name) -> Template:
     lonlat, ele = lonlat_ele
-    template = mwparserfromhell.nodes.template.Template(name)
+    template = Template(name)
     template.add(1, opt_lonlat_to_str(lonlat))
     template.add(2, opt_uint_to_str(ele))
     wrpylib.mwmarkup.format_template_oneline(template)
@@ -149,7 +153,7 @@ class ParseError(RuntimeError):
     pass
 
 
-def parse_wrmap_coordinates(coords):
+def parse_wrmap_coordinates(coords: str) -> List[List[float]]:
     """gets a string coordinates and returns an array of lon/lat coordinate pairs, e.g.
     47.12 N 11.87 E
     47.13 N 11.70 E
@@ -210,7 +214,7 @@ def parse_wrmap(wikitext):
         # determine feature type
         is_point = feature.tag in WRMAP_POINT_TYPES
         is_line = feature.tag in WRMAP_LINE_TYPES
-        if (not is_point and not is_line):
+        if not is_point and not is_line:
             raise ParseError('Unknown element <{}>.'.format(feature.tag))
 
         # point
@@ -219,11 +223,11 @@ def parse_wrmap(wikitext):
             allowed_properties = {'name', 'wiki'}
             wrong_properties = set(feature.attrib.keys()) - allowed_properties
             if len(wrong_properties) > 0:
-                raise ParseError("The attribute '{}' is not allowed at <{}>.".format(list(wrong_properties)[0], feature.tag))
+                raise ParseError(f"The attribute '{list(wrong_properties)[0]}' is not allowed at <{feature.tag}>.")
             properties.update(feature.attrib)
             coordinates = parse_wrmap_coordinates(feature.text)
             if len(coordinates) != 1:
-                raise ParseError('The element <{}> has to have exactly one coordinate pair.'.format(feature.tag))
+                raise ParseError(f'The element <{feature.tag}> has to have exactly one coordinate pair.')
             json_features.append({
                 'type': 'Feature',
                 'geometry': {'type': 'Point', 'coordinates': coordinates[0]},
@@ -235,14 +239,14 @@ def parse_wrmap(wikitext):
             allowed_properties = {'farbe', 'dicke'}
             wrong_properties = set(feature.attrib.keys()) - allowed_properties
             if len(wrong_properties) > 0:
-                raise ParseError("The attribute '{}' is not allowed at <{}>.".format(list(wrong_properties)[0], feature.tag))
+                raise ParseError(f"The attribute '{list(wrong_properties)[0]}' is not allowed at <{feature.tag}>.")
             if 'farbe' in feature.attrib: 
                 if not re.match('#[0-9a-fA-F]{6}$', feature.attrib['farbe']):
                     raise ParseError('The attribute "farbe" has to have a format like "#a0bb43".')
-                properties['strokeColor'] = feature.attrib['farbe'] # e.g. #a200b7
+                properties['strokeColor'] = feature.attrib['farbe']  # e.g. #a200b7
             if 'dicke' in feature.attrib:
                 try:
-                    properties['strokeWidth'] = int(feature.attrib['dicke']) # e.g. 6
+                    properties['strokeWidth'] = int(feature.attrib['dicke'])  # e.g. 6
                 except ValueError:
                     raise ParseError('The attribute "dicke" has to be an integer.')
             json_features.append({
index 48bc6b3cdabfe1fdab36e854153218ee8506987a..7f22cf0dc4205be739ab33c7bff26044fc69fbed 100644 (file)
@@ -12,7 +12,7 @@ import urllib.parse
 import re
 from collections import OrderedDict, namedtuple
 from email.errors import HeaderParseError
-from typing import Tuple, Optional, List, Any, Callable, Union, TypeVar, Dict
+from typing import Tuple, Optional, List, Callable, Union, TypeVar, Dict
 
 import mwparserfromhell  # https://github.com/earwig/mwparserfromhell
 
@@ -438,11 +438,11 @@ def masked_email_from_str(value: str, mask='(at)', masked_only=False) -> Tuple[s
 def masked_email_to_str(value: Tuple[str, bool], mask='(at)') -> str:
     """Value is a tuple. The first entry is the email address, the second one is a boolean telling whether the
     email address should be masked."""
-    email, do_masking = value
-    email = email_to_str(email)
+    email_, do_masking = value
+    email_ = email_to_str(email_)
     if do_masking:
-        email = email.replace('@', mask)
-    return email
+        email_ = email_.replace('@', mask)
+    return email_
 
 
 def emails_from_str(value: str) -> Optional[List[Tuple[str, str]]]:
@@ -521,7 +521,8 @@ def lonlat_from_str(value: str) -> LonLat:
     """Converts a Winterrodeln geo string like '47.076207 N 11.453553 E' (being '<latitude> N <longitude> E'
     to the LonLat(lon, lat) named  tuple."""
     r = re.match(r'(\d+\.\d+) N (\d+\.\d+) E', value)
-    if r is None: raise ValueError("Coordinates '{}' have not a format like '47.076207 N 11.453553 E'".format(value))
+    if r is None:
+        raise ValueError(f"Coordinates '{value}' have not a format like '47.076207 N 11.453553 E'")
     return LonLat(float(r.groups()[1]), float(r.groups()[0]))
 
 
@@ -683,9 +684,9 @@ CACHET_REGEXP = [r'(Tiroler Naturrodelbahn-Gütesiegel) ([12]\d{3}) (leicht|mitt
 
 def single_cachet_german_from_str(value: str) -> Tuple[str, str, str]:
     for pattern in CACHET_REGEXP:
-        match = re.match(pattern, value)
-        if match:
-            return match.groups()
+        match_ = re.match(pattern, value)
+        if match_:
+            return match_.groups()
     raise ValueError(f"'{value}' is no valid cachet")
 
 
@@ -778,9 +779,10 @@ def wikibox_from_template(template, converter_dict):
     # check if keys are superfluous
     superfluous_keys = {str(p.name.strip()) for p in template.params} - set(converter_dict.keys())
     for key in superfluous_keys:
-        exceptions_dict[key] = ValueError('Superfluous parameter: "{}"'.format(key))
+        exceptions_dict[key] = ValueError(f'Superfluous parameter: "{key}"')
     if len(exceptions_dict) > 0:
-        raise ValueErrorList('{} error(s) occurred when parsing template parameters.'.format(len(exceptions_dict)), exceptions_dict)
+        raise ValueErrorList(f'{len(exceptions_dict)} error(s) occurred when parsing template parameters.',
+                             exceptions_dict)
     return result
 
 
@@ -870,7 +872,7 @@ GASTHAUSBOX_TEMPLATE_NAME = 'Gasthausbox'
 
 
 GASTHAUSBOX_DICT = OrderedDict([
-    ('Position', opt_lonlat_converter), # '47.583333 N 15.75 E'
+    ('Position', opt_lonlat_converter),  # '47.583333 N 15.75 E'
     ('Höhe', opt_uint_converter),
     ('Betreiber', opt_str_converter),
     ('Sitzplätze', opt_uint_converter),