Work on parsing wikicode (currently Rodelbahnbox). master
authorPhilipp Spitzer <philipp@spitzer.priv.at>
Sun, 28 Nov 2021 22:14:30 +0000 (23:14 +0100)
committerPhilipp Spitzer <philipp@spitzer.priv.at>
Sun, 28 Nov 2021 22:14:30 +0000 (23:14 +0100)
23 files changed:
bots/json_edit.py [new file with mode: 0644]
bots/json_validate.py [new file with mode: 0644]
bots/sledrun_from_json.py [new file with mode: 0644]
bots/sledrun_wikitext_to_json.py [new file with mode: 0644]
setup.py
tests/test_json_validate.py [new file with mode: 0644]
tests/test_mwdb.py
tests/test_mwmarkup.py
tests/test_wrdem.py
tests/test_wrmwcache.py
tests/test_wrmwdb.py
tests/test_wrmwmarkup.py
tests/test_wrvalidators.py
wrpylib/json_tools.py [new file with mode: 0644]
wrpylib/mwdb.py
wrpylib/mwmarkup.py
wrpylib/templates/sledrun_wiki.txt [new file with mode: 0644]
wrpylib/wrdem.py
wrpylib/wrintermaps.py
wrpylib/wrmwcache.py
wrpylib/wrmwdb.py
wrpylib/wrmwmarkup.py
wrpylib/wrvalidators.py

diff --git a/bots/json_edit.py b/bots/json_edit.py
new file mode 100644 (file)
index 0000000..648b189
--- /dev/null
@@ -0,0 +1,75 @@
+#!/usr/bin/python
+"""
+User script for pywikibot (https://gerrit.wikimedia.org/r/pywikibot/core.git), tested with version 6.6.1.
+Put it in directory scripts/userscripts.
+
+Edit JSON associated with sledruns.
+
+The following generators and filters are supported:
+
+&params;
+"""
+import json
+
+import pywikibot
+from jsonschema import validate
+from pywikibot import pagegenerators, Page
+from pywikibot.bot import (
+    AutomaticTWSummaryBot,
+    ConfigParserBot,
+    ExistingPageBot,
+    NoRedirectPageBot,
+    SingleSiteBot,
+)
+
+from wrpylib.json_tools import order_json_keys
+from wrpylib.wrmwmarkup import create_sledrun_wiki
+
+
+docuReplacements = {'&params;': pagegenerators.parameterHelp}
+
+
+class SledrunFromJsonBot(
+    SingleSiteBot,
+    ConfigParserBot,
+    ExistingPageBot,
+    NoRedirectPageBot,
+    AutomaticTWSummaryBot,
+):
+    def setup(self) -> None:
+        schema = Page(self.site, 'Winterrodeln:Datenschema/Rodelbahn/V1.json')
+        assert schema.content_model == 'json'
+        self.sledrun_schema = json.loads(schema.text)
+
+    def treat_page(self) -> None:
+        """Load the given page, do some changes, and save it."""
+        if self.current_page.content_model != 'json':
+            return
+        content_json = json.loads(self.current_page.text)
+
+        # *here*, content_json can be processed
+        processed_json = content_json
+
+        validate(instance=processed_json, schema=self.sledrun_schema)
+        processed_json_ordered = order_json_keys(processed_json, self.sledrun_schema)
+        assert processed_json_ordered == processed_json
+        text = json.dumps(processed_json_ordered, ensure_ascii=False, indent=4)
+
+        summary = 'JSON Daten aktualisiert.'
+        self.put_current(text, summary=summary)
+
+
+def main(*args: str) -> None:
+    local_args = pywikibot.handle_args(args)
+    gen_factory = pagegenerators.GeneratorFactory()
+    gen_factory.handle_args(local_args)
+    gen = gen_factory.getCombinedGenerator(preload=True)
+    if gen:
+        bot = SledrunFromJsonBot(generator=gen)
+        bot.run()
+    else:
+        pywikibot.bot.suggest_help(missing_generator=True)
+
+
+if __name__ == '__main__':
+    main()
diff --git a/bots/json_validate.py b/bots/json_validate.py
new file mode 100644 (file)
index 0000000..4dbccfd
--- /dev/null
@@ -0,0 +1,73 @@
+#!/usr/bin/python
+"""
+User script for pywikibot (https://gerrit.wikimedia.org/r/pywikibot/core.git), tested with version 6.6.1.
+Put it in directory scripts/userscripts.
+
+Edit JSON associated with sledruns.
+
+The following generators and filters are supported:
+
+&params;
+"""
+import json
+import jsonschema
+from jsonschema import validate
+
+import pywikibot
+from pywikibot import pagegenerators, Page
+from pywikibot.bot import (
+    AutomaticTWSummaryBot,
+    ConfigParserBot,
+    ExistingPageBot,
+    NoRedirectPageBot,
+    SingleSiteBot,
+)
+
+from wrpylib.json_tools import order_json_keys
+
+docuReplacements = {'&params;': pagegenerators.parameterHelp}
+
+
+class SledrunFromJsonBot(
+    SingleSiteBot,
+    ConfigParserBot,
+    ExistingPageBot,
+    NoRedirectPageBot,
+    AutomaticTWSummaryBot,
+):
+    def setup(self) -> None:
+        schema = Page(self.site, 'Winterrodeln:Datenschema/Rodelbahn/V1.json')
+        assert schema.content_model == 'json'
+        self.sledrun_schema = json.loads(schema.text)
+
+    def treat_page(self) -> None:
+        """Load the given page, do some changes, and save it."""
+        content_json = json.loads(self.current_page.text)
+        try:
+            validate(instance=content_json, schema=self.sledrun_schema)
+
+            content_json_ordered = order_json_keys(content_json, self.sledrun_schema)
+            assert content_json_ordered == content_json
+            text = json.dumps(content_json_ordered, ensure_ascii=False, indent=4)
+
+        except jsonschema.exceptions.ValidationError as e:
+            text = str(e)
+
+        summary = 'JSON Daten aktualisiert.'
+        self.put_current(text, summary=summary)
+
+
+def main(*args: str) -> None:
+    local_args = pywikibot.handle_args(args)
+    gen_factory = pagegenerators.GeneratorFactory()
+    gen_factory.handle_args(local_args)
+    gen = gen_factory.getCombinedGenerator(preload=True)
+    if gen:
+        bot = SledrunFromJsonBot(generator=gen)
+        bot.run()
+    else:
+        pywikibot.bot.suggest_help(missing_generator=True)
+
+
+if __name__ == '__main__':
+    main()
diff --git a/bots/sledrun_from_json.py b/bots/sledrun_from_json.py
new file mode 100644 (file)
index 0000000..8a16f15
--- /dev/null
@@ -0,0 +1,72 @@
+#!/usr/bin/python
+"""
+User script for pywikibot (https://gerrit.wikimedia.org/r/pywikibot/core.git), tested with version 6.6.1.
+Put it in directory scripts/userscripts.
+
+Replace a sledrun page with content generated from associated JSON subpages
+(Rodelbahn and Landkarte).
+
+The following generators and filters are supported:
+
+&params;
+"""
+import json
+
+import pywikibot
+from pywikibot import pagegenerators, Page
+from pywikibot.bot import (
+    AutomaticTWSummaryBot,
+    ConfigParserBot,
+    ExistingPageBot,
+    NoRedirectPageBot,
+    SingleSiteBot,
+)
+from pywikibot.logging import warning
+
+from wrpylib.wrmwmarkup import create_sledrun_wiki
+
+
+docuReplacements = {'&params;': pagegenerators.parameterHelp}
+
+
+class SledrunFromJsonBot(
+    SingleSiteBot,
+    ConfigParserBot,
+    ExistingPageBot,
+    NoRedirectPageBot,
+    AutomaticTWSummaryBot,
+):
+    def treat_page(self) -> None:
+        """Load the given page, do some changes, and save it."""
+        sledrun_json_page = Page(self.site, self.current_page.title() + '/Rodelbahn.json')
+        if not sledrun_json_page.exists():
+            warning(f"{sledrun_json_page.title()} does not exist. Skipping.")
+            return
+        if sledrun_json_page.content_model != 'json':
+            warning(f"Content model of {sledrun_json_page.title()} is not 'json'.")
+            return
+        sledrun_json = json.loads(sledrun_json_page.text)
+        map_json_page = Page(self.site, self.current_page.title() + '/Landkarte.json')
+        if map_json_page.exists():
+            map_json = json.loads(map_json_page.text)
+        else:
+            map_json = None
+        text = create_sledrun_wiki(sledrun_json, map_json)
+        summary = 'Rodelbahnbeschreibung von aus JSON Daten aktualisiert.'
+        self.put_current(text, summary=summary)
+
+
+def main(*args: str) -> None:
+    local_args = pywikibot.handle_args(args)
+    gen_factory = pagegenerators.GeneratorFactory()
+    gen_factory.handle_args(local_args)
+    gen = gen_factory.getCombinedGenerator(preload=True)
+    if gen:
+        bot = SledrunFromJsonBot(generator=gen)
+        bot.run()
+    else:
+        pywikibot.bot.suggest_help(missing_generator=True)
+
+
+if __name__ == '__main__':
+    main()
diff --git a/bots/sledrun_wikitext_to_json.py b/bots/sledrun_wikitext_to_json.py
new file mode 100644 (file)
index 0000000..686a196
--- /dev/null
@@ -0,0 +1,167 @@
+#!/usr/bin/python
+"""
+User script for pywikibot (https://gerrit.wikimedia.org/r/pywikibot/core.git), tested with version 6.6.1.
+Put it in directory scripts/userscripts.
+
+Create a sledrun JSON page from a sledrun wikitext page (including map).
+
+The following generators and filters are supported:
+
+&params;
+"""
+import json
+
+import mwparserfromhell
+import pywikibot
+from pywikibot import pagegenerators, Page
+from pywikibot.bot import (
+    AutomaticTWSummaryBot,
+    ConfigParserBot,
+    ExistingPageBot,
+    NoRedirectPageBot,
+    SingleSiteBot,
+)
+from pywikibot.logging import warning
+from pywikibot.site._namespace import BuiltinNamespace
+
+from wrpylib.wrmwmarkup import create_sledrun_wiki, lonlat_to_json, lonlat_ele_to_json
+from wrpylib.wrvalidators import rodelbahnbox_from_template, tristate_german_to_str, difficulty_german_to_str, \
+    avalanches_german_to_str, public_transport_german_to_str
+
+from pywikibot.site import Namespace
+
+docuReplacements = {'&params;': pagegenerators.parameterHelp}
+
+
+class SledrunWikiTextToJsonBot(
+    SingleSiteBot,
+    ConfigParserBot,
+    ExistingPageBot,
+    NoRedirectPageBot,
+    AutomaticTWSummaryBot,
+):
+    def treat_page(self) -> None:
+        """Load the given page, do some changes, and save it."""
+        wikitext_content_model = 'wikitext'
+        if self.current_page.content_model != wikitext_content_model:
+            warning(f"The content model of {self.current_page.title()} is {self.current_page.content_model} "
+                    f"instead of {wikitext_content_model}.")
+            return
+
+        wikicode = mwparserfromhell.parse(self.current_page.text)
+        wikilink_list = wikicode.filter_wikilinks()
+        category_sledrun = 'Kategorie:Rodelbahn'
+        if sum(1 for c in wikilink_list if c.title == category_sledrun) == 0:
+            warning(f'The page {self.current_page.title()} does not have category {category_sledrun}.')
+            return
+
+        sledrun_json_page = Page(self.site, self.current_page.title() + '/Rodelbahn.json')
+        if sledrun_json_page.exists():
+            warning(f"{sledrun_json_page.title()} already exists, skipping {self.current_page.title()}.")
+            return
+
+        map_json_page = Page(self.site, self.current_page.title() + '/Landkarte.json')
+        if map_json_page.exists():
+            warning(f"{map_json_page.title()} already exists, skipping {self.current_page.title()}.")
+            return
+
+        sledrun_json = {
+            "name": self.current_page.title(),
+            "aliases": [],
+            "entry_under_construction": sum(1 for c in wikilink_list if c.text == 'Kategorie:In Arbeit') > 0,
+            "description": "Holadrio!",
+        }
+
+        map_json = None
+
+        rbb_list = wikicode.filter_templates(recursive=False, matches=lambda t: t.name.strip() == 'Rodelbahnbox')
+        if len(rbb_list) == 1:
+            rbb = rodelbahnbox_from_template(rbb_list[0])
+            v = rbb['Bild']
+            if v is not None:
+                image_page = Page(self.site, v, ns=BuiltinNamespace.FILE)
+                if image_page.exists():
+                    warning(f"{image_page.title()} does not exist.")
+                sledrun_json['image'] = v
+
+            v = rbb['Länge']
+            if v is not None:
+                sledrun_json['length'] = v
+
+            v = rbb['Schwierigkeit']
+            if v is not None:
+                sledrun_json['difficulty'] = difficulty_german_to_str(v)
+
+            v = rbb['Lawinen']
+            if v is not None:
+                sledrun_json['avalanches'] = avalanches_german_to_str(v)
+
+            v, w = rbb['Betreiber']
+            if v is not None:
+                sledrun_json['has_operator'] = v
+            if w is not None:
+                sledrun_json['operator'] = w
+
+            v = rbb['Aufstieg möglich']
+            if v is not None:
+                sledrun_json['walkup_possible'] = v
+
+            v, w = rbb['Aufstieg getrennt']
+            if v is not None:
+                sledrun_json['walkup_separate'] = tristate_german_to_str(v)
+            if w is not None:
+                sledrun_json['walkup_comment'] = w  # TODO
+
+            v = rbb['Gehzeit']
+            if v is not None:
+                sledrun_json['walkup_time'] = v
+
+            v, w = rbb['Beleuchtungsanlage']
+            if v is not None:
+                sledrun_json['nightlight_possible'] = tristate_german_to_str(v)
+            if w is not None:
+                sledrun_json['nightlight_description'] = w
+
+            v = rbb['In Übersichtskarte']
+            if v is not None:
+                sledrun_json['show_in_overview'] = v
+
+            v = rbb['Forumid']
+            if v is not None:
+                sledrun_json['forum_id'] = v
+
+            v = rbb['Position']
+            if v is not None:
+                sledrun_json['position'] = lonlat_to_json(v)
+
+            v = lonlat_ele_to_json(rbb['Position oben'], rbb['Höhe oben'])
+            if v != {}:
+                sledrun_json['top'] = v
+
+            v = lonlat_ele_to_json(rbb['Position unten'], rbb['Höhe unten'])
+            if v != {}:
+                sledrun_json['bottom'] = v
+
+            v = rbb['Öffentliche Anreise']
+            if v is not None:
+                sledrun_json['public_transport'] = public_transport_german_to_str(v)
+
+        text = create_sledrun_wiki(sledrun_json, map_json)
+        summary = 'Rodelbahnbeschreibung nach Konvertierung nach und von JSON.'
+        self.put_current(text, summary=summary)
+
+
+def main(*args: str) -> None:
+    local_args = pywikibot.handle_args(args)
+    gen_factory = pagegenerators.GeneratorFactory()
+    gen_factory.handle_args(local_args)
+    gen = gen_factory.getCombinedGenerator(preload=True)
+    if gen:
+        bot = SledrunWikiTextToJsonBot(generator=gen)
+        bot.run()
+    else:
+        pywikibot.bot.suggest_help(missing_generator=True)
+
+
+if __name__ == '__main__':
+    main()
index 3d2c12a56ba95da9ce491050cbb35852d461cddc..46cdfb1c80a9141354784c1fc78768b19d024823 100644 (file)
--- a/setup.py
+++ b/setup.py
@@ -24,4 +24,6 @@ setup(name='wrpylib',
         'scripts/updatewrreportcache.py',
         'scripts/updatewrsledruncache.py',
     ],
         'scripts/updatewrreportcache.py',
         'scripts/updatewrsledruncache.py',
     ],
+    package_data={'wrpylib': ['templates/*']},
+    zip_safe=False,
 )
 )
diff --git a/tests/test_json_validate.py b/tests/test_json_validate.py
new file mode 100644 (file)
index 0000000..01221e9
--- /dev/null
@@ -0,0 +1,292 @@
+import unittest
+from copy import deepcopy
+from typing import Dict
+
+from wrpylib.json_tools import order_json_keys, _resolve_ref
+
+schema_object: Dict = {
+    "$schema": "http://json-schema.org/draft-07/schema#",
+    "type": "object",
+    "required": [
+        "name"
+    ],
+    "properties": {
+        "name": {
+            "type": "string",
+        },
+        "aliases": {
+            "type": "array",
+            "items": {
+                "type": "string",
+                "title": "Alternativer Name"
+            }
+        },
+        "entry_under_construction": {
+            "type": "boolean",
+            "default": True,
+        },
+        "length": {
+            "type": "number",
+            "minimum": 1,
+            "optional": True
+        },
+        "difficulty": {
+            "type": "string",
+            "enum": [
+                "leicht",
+                "mittel",
+                "schwer"
+            ],
+        },
+        "walkup_time": {
+            "type": "number",
+        },
+    }
+}
+
+
+schema_array = {
+    "$schema": "http://json-schema.org/draft-07/schema#",
+    "type": "array",
+    "items": {
+        "type": "string",
+    }
+}
+
+
+class TestResolveRef(unittest.TestCase):
+    def test_no_ref(self):
+        sub_schema = schema_object['properties']['aliases']
+        actual = _resolve_ref(sub_schema, schema_object)
+        self.assertEqual(actual, sub_schema)
+
+    def test_ref(self):
+        root_schema = schema_object.copy()
+        root_schema["definitions"] = {
+            "weblink": {
+                "type": "string",
+            }
+        }
+        sub_schema = {
+            "$ref": "#/definitions/weblink"
+        }
+        expected = {
+            "type": "string",
+        }
+        actual = _resolve_ref(sub_schema, root_schema)
+        self.assertEqual(expected, actual)
+
+
+class TestOrderJsonKeys(unittest.TestCase):
+    def test_string_empty(self):
+        actual = order_json_keys('', {"type": "string"})
+        self.assertEqual('', actual)
+
+    def test_string_valid(self):
+        actual = order_json_keys('äüß', {"type": "string"})
+        self.assertEqual('äüß', actual)
+
+    def test_string_wrong_types(self):
+        for value in [True, False, None, 0, 1, 0.5, [], ['abc'], {}, {'a': 'b'}]:
+            with self.subTest(f'Try invalid value {value}.', value=value):
+                with self.assertRaises(ValueError):
+                    order_json_keys(value, {"type": "string"})
+
+    def test_number_valid(self):
+        for value in [0, 1, -10, 8.8, 0.0, -1024.9]:
+            with self.subTest(f'Valid number {value}.', value=value):
+                actual = order_json_keys(value, {"type": "number"})
+                self.assertEqual(value, actual)
+
+    def test_number_wrong_types(self):
+        for value in [True, False, None, '', 'abc', [], [0], [1], {}, {'a': 1}]:
+            with self.subTest(f'Try invalid value {value}.', value=value):
+                with self.assertRaises(ValueError):
+                    order_json_keys(value, {"type": "number"})
+
+    def test_object_empty(self):
+        schema = schema_object.copy()
+        del schema['required']
+        actual = order_json_keys({}, schema)
+        self.assertEqual({}, actual)
+
+    def test_object_missing_keys(self):
+        with self.assertRaises(ValueError):
+            order_json_keys({}, schema_object)
+
+    def test_object_some_keys(self):
+        actual = order_json_keys({'name': 'X', 'walkup_time': 30, 'difficulty': 'mittel'}, schema_object)
+        self.assertEqual({'name': 'X', 'difficulty': 'mittel', 'walkup_time': 30}, actual)
+
+    def test_object_all_keys(self):
+        value = {
+            'walkup_time': 120,
+            'aliases': ['c', 'a', 'b'],
+            'entry_under_construction': False,
+            'name': 'ÖÄÜ',
+            'difficulty': 'leicht',
+            'length': 60,
+        }
+        expected = {
+            'name': 'ÖÄÜ',
+            'aliases': ['c', 'a', 'b'],
+            'entry_under_construction': False,
+            'length': 60,
+            'difficulty': 'leicht',
+            'walkup_time': 120,
+        }
+        actual = order_json_keys(value, schema_object)
+        self.assertEqual(expected, actual)
+
+    def test_object_additional_keys(self):
+        value = {
+            'walkup_time': 120,
+            'aliases': ['c', 'a', 'b'],
+            'entry_under_construction': False,
+            'name': 'ÖÄÜ',
+            'surprise': True,
+            'difficulty': 'leicht',
+            'length': 60,
+        }
+        expected = {
+            'name': 'ÖÄÜ',
+            'aliases': ['c', 'a', 'b'],
+            'entry_under_construction': False,
+            'length': 60,
+            'difficulty': 'leicht',
+            'walkup_time': 120,
+            'surprise': True,
+        }
+        actual = order_json_keys(value, schema_object)
+        self.assertEqual(expected, actual)
+
+    def test_object_forbidden_additional_keys(self):
+        value = {
+            'name': 'abc',
+            'surprise': True,
+            'difficulty': 'leicht',
+        }
+        schema = schema_object.copy()
+        schema['additionalProperties'] = False
+        with self.assertRaises(ValueError):
+            order_json_keys(value, schema)
+
+    def test_object_nested(self):
+        schema = deepcopy(schema_object)
+        schema['properties']['nested'] = schema_object
+        value = {
+            'nested': {
+                'length': 3,
+                'name': 'Mustermann',
+            },
+            'name': 'parent',
+        }
+        expected = {
+            'name': 'parent',
+            'nested': {
+                'name': 'Mustermann',
+                'length': 3,
+            }
+        }
+        actual = order_json_keys(value, schema)
+        self.assertEqual(expected, actual)
+
+    def test_object_nested_wrong_type(self):
+        schema = deepcopy(schema_object)
+        schema['properties']['nested'] = schema_object
+        with self.assertRaises(ValueError) as cm:
+            order_json_keys({'name': 'name', 'nested': {'name': True}}, schema)
+        self.assertIn("['nested']['name']", str(cm.exception))
+
+    def test_array_empty(self):
+        schema = schema_array
+        actual = order_json_keys([], schema)
+        self.assertEqual([], actual)
+
+    def test_array_string(self):
+        schema = schema_array
+        actual = order_json_keys(['', 'a', 'ßöÄ'], schema)
+        self.assertEqual(['', 'a', 'ßöÄ'], actual)
+
+    def test_array_number(self):
+        schema = deepcopy(schema_array)
+        schema['items']['type'] = "number"
+        value = [0, 6.3, -3]
+        actual = order_json_keys(value, schema)
+        self.assertEqual(value, actual)
+
+    def test_array_object(self):
+        schema = deepcopy(schema_array)
+        schema['items']['type'] = "object"
+        value = [{}, {'a': 'b'}]
+        actual = order_json_keys(value, schema)
+        self.assertEqual(value, actual)
+
+    def test_array_array(self):
+        schema = deepcopy(schema_array)
+        schema['items']['type'] = "array"
+        value = [[], [3], [6, 8, 3.3]]
+        actual = order_json_keys(value, schema)
+        self.assertEqual(value, actual)
+
+    def test_array_boolean(self):
+        schema = deepcopy(schema_array)
+        schema['items']['type'] = "boolean"
+        value = [False, True, False]
+        actual = order_json_keys(value, schema)
+        self.assertEqual(value, actual)
+
+    def test_array_null(self):
+        schema = deepcopy(schema_array)
+        schema['items']['type'] = "null"
+        value = [None, None]
+        actual = order_json_keys(value, schema)
+        self.assertEqual(value, actual)
+
+    def test_array_wrong_string_types(self):
+        schema = {
+            "type": "array",
+            "items": {
+                "type": "string",
+            }
+        }
+        for v in [7, 0.4, {}, {'a': 'b'}, [], [1, 2, 3], True, False, None]:
+            value = [v]
+            with self.subTest(f'Try invalid value {value}.', value=value):
+                with self.assertRaises(ValueError):
+                    order_json_keys(value, schema)
+
+    def test_array_mixed_types(self):
+        schema = {
+            "type": "array",
+            "items": {
+                "type": "string",
+            }
+        }
+        for v in [7, 0.4, {}, {'a': 'b'}, [], [1, 2, 3], True, False, None]:
+            value = ['a', v]
+            with self.subTest(f'Try invalid value {value}.', value=value):
+                with self.assertRaises(ValueError):
+                    order_json_keys(value, schema)
+
+    def test_boolean_valid(self):
+        for value in [False, True]:
+            with self.subTest(f'Valid boolean {value}.', value=value):
+                actual = order_json_keys(value, {"type": "boolean"})
+                self.assertEqual(value, actual)
+
+    def test_boolean_wrong_types(self):
+        for value in [0, 1, -3, 10, 5.5, None, '', 'True', [], [True], [False], {}, {'a': True}]:
+            with self.subTest(f'Try invalid value {value}.', value=value):
+                with self.assertRaises(ValueError):
+                    order_json_keys(value, {"type": "boolean"})
+
+    def test_null_valid(self):
+        actual = order_json_keys(None, {"type": "null"})
+        self.assertEqual(None, actual)
+
+    def test_null_wrong_types(self):
+        for value in [True, False, 0, 1, -3, 10, 5.5, '', 'abc', [], [0], [1], {}, {'a': 1}]:
+            with self.subTest(f'Try invalid value {value}.', value=value):
+                with self.assertRaises(ValueError):
+                    order_json_keys(value, {"type": "null"})
index 92a2b3e53133af2c83f9c3d52a8560c5b9384ded..640566b3c503f596d3ba46a0a0edd11153231380 100644 (file)
@@ -1,4 +1,3 @@
-#!/usr/bin/python3.4
 import unittest
 import MySQLdb
 import sqlalchemy
 import unittest
 import MySQLdb
 import sqlalchemy
@@ -22,7 +21,7 @@ class TestMwDb(unittest.TestCase):
 
     def test_page_table(self):
         Page = wrpylib.mwdb.page_table(self.metadata)
 
     def test_page_table(self):
         Page = wrpylib.mwdb.page_table(self.metadata)
-        page = self.session.query(Page).filter(Page.c.page_id==1321).first()
+        page = self.session.query(Page).filter(Page.c.page_id == 1321).first()
         self.assertEqual(page.page_id, 1321)
         self.assertEqual(type(page.page_title), str)
         self.assertEqual(type(page.page_restrictions), bytes)
         self.assertEqual(page.page_id, 1321)
         self.assertEqual(type(page.page_title), str)
         self.assertEqual(type(page.page_restrictions), bytes)
@@ -32,8 +31,8 @@ class TestMwDb(unittest.TestCase):
         Revision = wrpylib.mwdb.revision_table(self.metadata)
         revision = self.session.query(Revision).filter(Revision.c.rev_id == 666).first()
         self.assertEqual(revision.rev_id, 666)
         Revision = wrpylib.mwdb.revision_table(self.metadata)
         revision = self.session.query(Revision).filter(Revision.c.rev_id == 666).first()
         self.assertEqual(revision.rev_id, 666)
-        self.assertEqual(type(revision.rev_comment), str)
-        self.assertEqual(type(revision.rev_user_text), str)
+        self.assertEqual(type(revision.rev_comment_id), int)
+        self.assertEqual(type(revision.rev_actor), int)
         self.assertEqual(type(revision.rev_timestamp), str)
 
     def test_text_table(self):
         self.assertEqual(type(revision.rev_timestamp), str)
 
     def test_text_table(self):
@@ -44,7 +43,6 @@ class TestMwDb(unittest.TestCase):
         self.assertEqual(type(text.old_flags), str)
         self.assertEqual(text.old_flags, 'utf-8')
 
         self.assertEqual(type(text.old_flags), str)
         self.assertEqual(text.old_flags, 'utf-8')
 
-
     def test_user_table(self):
         User = wrpylib.mwdb.user_table(self.metadata)
         user = self.session.query(User).filter(User.c.user_id == 1).first()
     def test_user_table(self):
         User = wrpylib.mwdb.user_table(self.metadata)
         user = self.session.query(User).filter(User.c.user_id == 1).first()
@@ -54,7 +52,6 @@ class TestMwDb(unittest.TestCase):
         self.assertEqual(type(user.user_email), str)
         self.assertEqual(user.user_name, 'Philipp')
 
         self.assertEqual(type(user.user_email), str)
         self.assertEqual(user.user_name, 'Philipp')
 
-
     def test_categorylinks_table(self):
         Categorylinks = wrpylib.mwdb.categorylinks_table(self.metadata)
         categorylinks = self.session.query(Categorylinks).filter(Categorylinks.c.cl_from == 609).first()
     def test_categorylinks_table(self):
         Categorylinks = wrpylib.mwdb.categorylinks_table(self.metadata)
         categorylinks = self.session.query(Categorylinks).filter(Categorylinks.c.cl_from == 609).first()
@@ -69,6 +66,7 @@ class TestMySqlPython(unittest.TestCase):
     because byte strings are returned instead of unicode for columns having
     a _bin collation, see https://sourceforge.net/p/mysql-python/bugs/289/
     This has been fixed in MySQL_python version 1.2.4."""
     because byte strings are returned instead of unicode for columns having
     a _bin collation, see https://sourceforge.net/p/mysql-python/bugs/289/
     This has been fixed in MySQL_python version 1.2.4."""
+
     @classmethod
     def setUpClass(cls):
         cls.db = MySQLdb.connect(db='philipp_winterrodeln_wiki', charset='utf8mb4')
     @classmethod
     def setUpClass(cls):
         cls.db = MySQLdb.connect(db='philipp_winterrodeln_wiki', charset='utf8mb4')
@@ -88,9 +86,9 @@ class TestMySqlPython(unittest.TestCase):
         self.assertEqual(type(result[2]), bytes)  # binary(14) NOT NULL
 
     def test_datatype_revision(self):
         self.assertEqual(type(result[2]), bytes)  # binary(14) NOT NULL
 
     def test_datatype_revision(self):
-        result = self.exec_sql('select rev_comment, rev_user_text, rev_timestamp from revision where rev_id = 7586')
-        self.assertEqual(type(result[0]), bytes)  # tinyblob NOT NULL
-        self.assertEqual(type(result[1]), bytes)  # varbinary(255) NOT NULL DEFAULT ''
+        result = self.exec_sql('select rev_comment_id, rev_actor, rev_timestamp from revision where rev_id = 7586')
+        self.assertEqual(type(result[0]), int)  # tinyblob NOT NULL
+        self.assertEqual(type(result[1]), int)  # varbinary(255) NOT NULL DEFAULT ''
         self.assertEqual(type(result[2]), bytes)  # binary(14) NOT NULL
 
     def test_datatypes_text(self):
         self.assertEqual(type(result[2]), bytes)  # binary(14) NOT NULL
 
     def test_datatypes_text(self):
index 41225f13a1d2bc733712d7e402f20e13e90ea2f1..14e0e17a65d8db76fa25d032fa1e4eabecd7cde2 100644 (file)
@@ -1,4 +1,3 @@
-#!/usr/bin/python3.4
 import unittest
 import mwparserfromhell
 import wrpylib.mwmarkup
 import unittest
 import mwparserfromhell
 import wrpylib.mwmarkup
@@ -39,8 +38,8 @@ class TestMwParserFromHell(unittest.TestCase):
         rb = list(wikicode.filter_templates())[0]
         self.assertEqual(rb.name.strip(), 'Rodelbahnbox')
         self.assertEqual(rb.get('Aufstiegshilfe').value.strip(), 'Nein')
         rb = list(wikicode.filter_templates())[0]
         self.assertEqual(rb.name.strip(), 'Rodelbahnbox')
         self.assertEqual(rb.get('Aufstiegshilfe').value.strip(), 'Nein')
-        self.assertEqual(rb[:2], '{{')
-        self.assertEqual(rb[-2:], '}}')
+        self.assertEqual(str(rb)[:2], '{{')
+        self.assertEqual(str(rb)[-2:], '}}')
 
     def test_template_to_table(self):
         wikitext = '{{Rodelbahnbox | Unbenannt | Position = 47.309820 N 9.986508 E | Aufstieg möglich = Ja }}'
 
     def test_template_to_table(self):
         wikitext = '{{Rodelbahnbox | Unbenannt | Position = 47.309820 N 9.986508 E | Aufstieg möglich = Ja }}'
index 8304ea235953976f064b8edcdd0e75bf38e40c82..fb6f0f69fa3dc70c2279e1feb6e562d07ec02388 100644 (file)
@@ -1,7 +1,7 @@
 import unittest
 
 import owslib.wms
 import unittest
 
 import owslib.wms
-import rasterio
+import rasterio  # pip install rasterio
 
 from wrpylib.wrdem import get_ele_from_raster, get_ele_from_wms, MultiDem, transform_lon_lat, DemSwitzerland, \
     DemBasemap, DemBavaria
 
 from wrpylib.wrdem import get_ele_from_raster, get_ele_from_wms, MultiDem, transform_lon_lat, DemSwitzerland, \
     DemBasemap, DemBavaria
index e4ea4b3e0c716fc3a8931ce76f44eb65cc789ef0..99bc00ab05135f0b7f48f3bce871a6e9c2668aa8 100644 (file)
@@ -1,4 +1,3 @@
-#!/usr/bin/python3.4
 from sqlalchemy.engine import create_engine
 from wrpylib import wrmwcache
 import unittest
 from sqlalchemy.engine import create_engine
 from wrpylib import wrmwcache
 import unittest
index 242db2b77a4b0725b96ee946279be7e26902860e..c57ef89c6715ce9ddedab5ca5f0d3160bb80a345 100644 (file)
@@ -1,4 +1,3 @@
-#!/usr/bin/python3.4
 import os
 import unittest
 import sqlalchemy
 import os
 import unittest
 import sqlalchemy
index 2be3f6e796db5aa6c18dd3b2fa4fe1086d9cdc9c..273fa36c5b58e553250d463e1987a6684b0c4f49 100644 (file)
@@ -1,4 +1,3 @@
-#!/usr/bin/python3.4
 import collections
 import textwrap
 import unittest
 import collections
 import textwrap
 import unittest
@@ -7,7 +6,8 @@ import wrpylib.wrvalidators
 import wrpylib.wrmwmarkup
 from wrpylib.wrvalidators import LonLat
 from wrpylib.wrmwmarkup import sledrun_from_rodelbahnbox, sledrun_to_rodelbahnbox, \
 import wrpylib.wrmwmarkup
 from wrpylib.wrvalidators import LonLat
 from wrpylib.wrmwmarkup import sledrun_from_rodelbahnbox, sledrun_to_rodelbahnbox, \
-    inn_from_gasthausbox, inn_to_gasthausbox, lonlat_ele_from_template, latlon_ele_to_template
+    inn_from_gasthausbox, inn_to_gasthausbox, lonlat_ele_from_template, latlon_ele_to_template, create_sledrun_wiki, \
+    lonlat_to_json, lonlat_ele_to_json
 
 
 class TestSledrun(unittest.TestCase):
 
 
 class TestSledrun(unittest.TestCase):
@@ -16,14 +16,14 @@ class TestSledrun(unittest.TestCase):
             pass
         rodelbahnbox = collections.OrderedDict([
             ('Position', LonLat(9.986508, 47.30982)),
             pass
         rodelbahnbox = collections.OrderedDict([
             ('Position', LonLat(9.986508, 47.30982)),
-            ('Position oben', LonLat(None, None)),
+            ('Position oben', None),
             ('Höhe oben', 1244),
             ('Position unten', LonLat(8.506047, 46.20210)),
             ('Höhe unten', None),
             ('Länge', 5045),
             ('Schwierigkeit', 3),
             ('Lawinen', 2),
             ('Höhe oben', 1244),
             ('Position unten', LonLat(8.506047, 46.20210)),
             ('Höhe unten', None),
             ('Länge', 5045),
             ('Schwierigkeit', 3),
             ('Lawinen', 2),
-            ('Betreiber', 'SchneeFunFit'),
+            ('Betreiber', (True, 'SchneeFunFit')),
             ('Öffentliche Anreise', 2),
             ('Aufstieg möglich', True),
             ('Aufstieg getrennt', (0.0, None)),
             ('Öffentliche Anreise', 2),
             ('Aufstieg möglich', True),
             ('Aufstieg getrennt', (0.0, None)),
@@ -116,7 +116,7 @@ class TestSledrun(unittest.TestCase):
         self.assertEqual(rodelbahnbox['Länge'], 9644)
         self.assertEqual(rodelbahnbox['Schwierigkeit'], 3)
         self.assertEqual(rodelbahnbox['Lawinen'], 2)
         self.assertEqual(rodelbahnbox['Länge'], 9644)
         self.assertEqual(rodelbahnbox['Schwierigkeit'], 3)
         self.assertEqual(rodelbahnbox['Lawinen'], 2)
-        self.assertEqual(rodelbahnbox['Betreiber'], 'McRodel')
+        self.assertEqual(rodelbahnbox['Betreiber'], (True, 'McRodel'))
         self.assertEqual(rodelbahnbox['Öffentliche Anreise'], 3)
         self.assertEqual(rodelbahnbox['Aufstieg möglich'], True)
         self.assertEqual(rodelbahnbox['Aufstieg getrennt'], (0.5, 'Nur unterer Teil'))
         self.assertEqual(rodelbahnbox['Öffentliche Anreise'], 3)
         self.assertEqual(rodelbahnbox['Aufstieg möglich'], True)
         self.assertEqual(rodelbahnbox['Aufstieg getrennt'], (0.5, 'Nur unterer Teil'))
@@ -210,7 +210,7 @@ class TestInn(unittest.TestCase):
         class Inn:
             pass
         gasthausbox = collections.OrderedDict()
         class Inn:
             pass
         gasthausbox = collections.OrderedDict()
-        gasthausbox['Position'] = LonLat(None, None)
+        gasthausbox['Position'] = None
         gasthausbox['Höhe'] = None
         gasthausbox['Betreiber'] = None
         gasthausbox['Sitzplätze'] = None
         gasthausbox['Höhe'] = None
         gasthausbox['Betreiber'] = None
         gasthausbox['Sitzplätze'] = None
@@ -265,7 +265,7 @@ class TestInn(unittest.TestCase):
         inn.image = None
         inn.sledding_list = 'Nein'
         gasthausbox = inn_to_gasthausbox(inn)
         inn.image = None
         inn.sledding_list = 'Nein'
         gasthausbox = inn_to_gasthausbox(inn)
-        self.assertEqual(gasthausbox['Position'], LonLat(None, None))
+        self.assertEqual(gasthausbox['Position'], None)
         self.assertEqual(gasthausbox['Höhe'], None)
         self.assertEqual(gasthausbox['Betreiber'], None)
         self.assertEqual(gasthausbox['Sitzplätze'], None)
         self.assertEqual(gasthausbox['Höhe'], None)
         self.assertEqual(gasthausbox['Betreiber'], None)
         self.assertEqual(gasthausbox['Sitzplätze'], None)
@@ -283,7 +283,7 @@ class TestInn(unittest.TestCase):
         class Inn:
             pass
         gasthausbox = collections.OrderedDict()
         class Inn:
             pass
         gasthausbox = collections.OrderedDict()
-        gasthausbox['Position'] = LonLat(None, None)
+        gasthausbox['Position'] = None
         gasthausbox['Höhe'] = None
         gasthausbox['Betreiber'] = None
         gasthausbox['Sitzplätze'] = None
         gasthausbox['Höhe'] = None
         gasthausbox['Betreiber'] = None
         gasthausbox['Sitzplätze'] = None
@@ -338,7 +338,7 @@ class TestInn(unittest.TestCase):
         inn.image = None
         inn.sledding_list = None
         gasthausbox = inn_to_gasthausbox(inn)
         inn.image = None
         inn.sledding_list = None
         gasthausbox = inn_to_gasthausbox(inn)
-        self.assertEqual(gasthausbox['Position'], LonLat(None, None))
+        self.assertEqual(gasthausbox['Position'], None)
         self.assertEqual(gasthausbox['Höhe'], None)
         self.assertEqual(gasthausbox['Betreiber'], None)
         self.assertEqual(gasthausbox['Sitzplätze'], None)
         self.assertEqual(gasthausbox['Höhe'], None)
         self.assertEqual(gasthausbox['Betreiber'], None)
         self.assertEqual(gasthausbox['Sitzplätze'], None)
@@ -364,6 +364,25 @@ class TestLonlatEle(unittest.TestCase):
         template = latlon_ele_to_template((LonLat(11.468819, 46.942239), 1866), 'Position oben')
         self.assertEqual('{{Position oben|46.942239 N 11.468819 E|1866}}', template)
 
         template = latlon_ele_to_template((LonLat(11.468819, 46.942239), 1866), 'Position oben')
         self.assertEqual('{{Position oben|46.942239 N 11.468819 E|1866}}', template)
 
+    def test_lonlat_to_json(self):
+        actual = lonlat_to_json(LonLat(11.2, 47.6))
+        self.assertEqual({'longitude': 11.2, 'latitude': 47.6}, actual)
+
+    def test_lonlat_ele_to_json(self):
+        actual = lonlat_ele_to_json(LonLat(12.3, 42.9), 420)
+        expected = {'position': {'longitude': 12.3, 'latitude': 42.9}, 'elevation': 420}
+        self.assertEqual(expected, actual)
+
+        actual = lonlat_ele_to_json(LonLat(13.0, 40.1), None)
+        expected = {'position': {'longitude': 13.0, 'latitude': 40.1}}
+        self.assertEqual(expected, actual)
+
+        actual = lonlat_ele_to_json(None, 1580)
+        expected = {'elevation': 1580}
+        self.assertEqual(expected, actual)
+
+        self.assertEqual({}, lonlat_ele_to_json(None, None))
+
 
 class TestWrMap(unittest.TestCase):
     def test_parse_wrmap(self):
 
 class TestWrMap(unittest.TestCase):
     def test_parse_wrmap(self):
@@ -442,7 +461,7 @@ class TestWrMap(unittest.TestCase):
 
         wikitext = wrpylib.wrmwmarkup.create_wrmap(geojson)
         self.assertEqual(wikitext, textwrap.dedent('''\
 
         wikitext = wrpylib.wrmwmarkup.create_wrmap(geojson)
         self.assertEqual(wikitext, textwrap.dedent('''\
-        <wrmap height="400" lat="47.241713" lon="11.214089" width="700" zoom="14">
+        <wrmap lon="11.214089" lat="47.241713" zoom="14" width="700" height="400">
 
         <gasthaus name="Rosskogelhütte" wiki="Rosskogelhütte">47.240689 N 11.190454 E</gasthaus>
         <parkplatz>47.245789 N 11.238971 E</parkplatz>
 
         <gasthaus name="Rosskogelhütte" wiki="Rosskogelhütte">47.240689 N 11.190454 E</gasthaus>
         <parkplatz>47.245789 N 11.238971 E</parkplatz>
@@ -455,3 +474,9 @@ class TestWrMap(unittest.TestCase):
         </rodelbahn>
 
         </wrmap>'''))
         </rodelbahn>
 
         </wrmap>'''))
+
+
+class TestWikiJson(unittest.TestCase):
+    def test_empty_json(self):
+        wiki_text = create_sledrun_wiki({}, None)
+        self.assertIsInstance(wiki_text, str)
index afe28dd7ab476507bdcd11b09909ba35396cc5c0..0cd3937051d602c485dc7b48cbc11ec48ed0b733 100644 (file)
@@ -1,7 +1,7 @@
-#!/usr/bin/python3.4
 import unittest
 from wrpylib.wrvalidators import *
 
 import unittest
 from wrpylib.wrvalidators import *
 
+
 # optional converter
 # ------------------
 
 # optional converter
 # ------------------
 
@@ -814,12 +814,12 @@ class TestRodelbahnbox(unittest.TestCase):
         self.assertEqual(LonLat(12.806522, 46.807218), value['Position'])
         self.assertEqual(LonLat(12.818658, 46.799014), value['Position oben'])
         self.assertEqual(1046, value['Höhe oben'])
         self.assertEqual(LonLat(12.806522, 46.807218), value['Position'])
         self.assertEqual(LonLat(12.818658, 46.799014), value['Position oben'])
         self.assertEqual(1046, value['Höhe oben'])
-        self.assertEqual(LonLat(None, None), value['Position unten'])
+        self.assertEqual(None, value['Position unten'])
         self.assertEqual(None, value['Höhe unten'])
         self.assertEqual(3500, value['Länge'])
         self.assertEqual(2, value['Schwierigkeit'])
         self.assertEqual(1, value['Lawinen'])
         self.assertEqual(None, value['Höhe unten'])
         self.assertEqual(3500, value['Länge'])
         self.assertEqual(2, value['Schwierigkeit'])
         self.assertEqual(1, value['Lawinen'])
-        self.assertEqual('Bringungsgemeinschaft Kreithof-Dolomitenhütte', value['Betreiber'])
+        self.assertEqual((True, 'Bringungsgemeinschaft Kreithof-Dolomitenhütte'), value['Betreiber'])
         self.assertEqual(4, value['Öffentliche Anreise'])
         self.assertEqual(True, value['Aufstieg möglich'])
         self.assertEqual((0.5, None), value['Aufstieg getrennt'])
         self.assertEqual(4, value['Öffentliche Anreise'])
         self.assertEqual(True, value['Aufstieg möglich'])
         self.assertEqual((0.5, None), value['Aufstieg getrennt'])
@@ -840,12 +840,12 @@ class TestRodelbahnbox(unittest.TestCase):
             ('Position', LonLat(12.806522, 46.807218)),
             ('Position oben', LonLat(12.818658, 46.799014)),
             ('Höhe oben', 1046),
             ('Position', LonLat(12.806522, 46.807218)),
             ('Position oben', LonLat(12.818658, 46.799014)),
             ('Höhe oben', 1046),
-            ('Position unten', LonLat(None, None)),
+            ('Position unten', None),
             ('Höhe unten', None),
             ('Länge', 3500),
             ('Schwierigkeit', 2),
             ('Lawinen', 1),
             ('Höhe unten', None),
             ('Länge', 3500),
             ('Schwierigkeit', 2),
             ('Lawinen', 1),
-            ('Betreiber', 'Bringungsgemeinschaft Kreithof-Dolomitenhütte'),
+            ('Betreiber', (True, 'Bringungsgemeinschaft Kreithof-Dolomitenhütte')),
             ('Öffentliche Anreise', 4),
             ('Aufstieg möglich', True),
             ('Aufstieg getrennt', (0.5, None)),
             ('Öffentliche Anreise', 4),
             ('Aufstieg möglich', True),
             ('Aufstieg getrennt', (0.5, None)),
diff --git a/wrpylib/json_tools.py b/wrpylib/json_tools.py
new file mode 100644 (file)
index 0000000..d688309
--- /dev/null
@@ -0,0 +1,129 @@
+from typing import Union, Dict, List
+
+
+JsonTypes = Union[Dict, List, str, int, float, bool, None]
+
+
+class ValidationError(ValueError):
+    pass
+
+
+def _fmt_path(path: List) -> str:
+    return f'schema[{"][".join(map(repr, path))}]'
+
+
+def _resolve_ref_not_recursive(sub_schema: JsonTypes, schema: JsonTypes) -> JsonTypes:
+    """In case the sub_schema is a dict and has direct "$ref" keys,
+    it is replaced by a dict where the "$ref" key is replaced by the corresponding definition in schema.
+    Nested $ref keys are not resolved.
+    Recursive $ref keys are not resolved.
+
+    :param sub_schema: JSON sub-schema where a "$ref" key is possible replaced.
+        The value of "$ref" could be e.g. "#/definitions/position"
+    :param schema: JSON root schema containing definitions for the keys.
+    :raise ValidationError: In case a "$ref" could not be resolved.
+    """
+    if not isinstance(sub_schema, dict) or '$ref' not in sub_schema:
+        return sub_schema
+    ref = sub_schema['$ref']
+    if not isinstance(ref, str):
+        raise ValidationError(f'Type of reference {ref} is not string.')
+    path = ref.split('/')
+    if len(path) == 0 or path[0] != '#':
+        raise ValidationError(f'Unsupported reference {ref}.')
+    ref_schema = schema
+    for p in path[1:]:
+        if not isinstance(ref_schema, dict) or p not in ref_schema:
+            raise ValidationError(f'Reference path {ref} not found in schema.')
+        ref_schema = ref_schema[p]
+    if not isinstance(ref_schema, dict):
+        raise ValidationError(f'Reference path {ref} is no dict.')
+    sub_schema = sub_schema.copy()
+    del sub_schema['$ref']
+    resolved_schema = ref_schema.copy()
+    resolved_schema.update(sub_schema)
+    return resolved_schema
+
+
+def _resolve_ref(sub_schema: JsonTypes, schema: JsonTypes) -> JsonTypes:
+    """Same as `_resolve_ref_not_recursive` but recursively resolves $ref keys.
+    However, does not resolve nested $ref keys.
+    """
+    resolved_schema = sub_schema
+    while isinstance(resolved_schema, dict) and '$ref' in resolved_schema:
+        resolved_schema = _resolve_ref_not_recursive(resolved_schema, schema)
+    return resolved_schema
+
+
+def _order_json_keys_string(sub_value: JsonTypes, sub_schema: JsonTypes, schema: JsonTypes, path: List) -> str:
+    if not isinstance(sub_value, str):
+        raise ValidationError(f'Type of {_fmt_path(path)} needs to be string (Python str).')
+    return sub_value
+
+
+def _order_json_keys_number(sub_value: JsonTypes, sub_schema: JsonTypes,
+                            schema: JsonTypes, path: List) -> Union[int, float]:
+    if not isinstance(sub_value, (int, float)) or isinstance(sub_value, bool):
+        raise ValidationError(f'Type of {_fmt_path(path)} needs to be number (Python int or float).')
+    return sub_value
+
+
+def _order_json_keys_object(sub_value: JsonTypes, sub_schema: JsonTypes, schema: JsonTypes, path: List) -> Dict:
+    if not isinstance(sub_value, dict):
+        raise ValidationError(f'Type of {_fmt_path(path)} needs to be object (Python dict).')
+    v = sub_value.copy()
+    p = sub_schema.get('properties', {})
+    result = {}
+    for key in p:
+        if key in v:
+            result[key] = _order_json_keys(v.pop(key), p[key], schema, path + [key])
+        else:
+            if key in sub_schema.get('required', []):
+                raise ValidationError(f'Required key "{key}" not present ({_fmt_path(path)}).')
+    if len(v) > 0:
+        if sub_schema.get('additionalProperties', True):
+            # strictly speaking additionalProperties could be more complicated than boolean
+            result.update(v)
+        else:
+            raise ValidationError(f'Keys not allowed in {_fmt_path(path)}: {", ".join(v)}')
+    return result
+
+
+def _order_json_keys_array(sub_value: JsonTypes, sub_schema: JsonTypes, schema: JsonTypes, path: List) -> List:
+    if not isinstance(sub_value, list):
+        raise ValidationError(f'Type of {"".join(_fmt_path(path))} needs to be array (Python list).')
+    s = sub_schema.get('items', True)
+    return [_order_json_keys(v, s, schema, path + [i]) for i, v in enumerate(sub_value)]
+
+
+def _order_json_keys_boolean(sub_value: JsonTypes, sub_schema: JsonTypes, schema: JsonTypes, path: List) -> bool:
+    if not isinstance(sub_value, bool):
+        raise ValidationError(f'Type of {_fmt_path(path)} needs to be boolean (Python bool).')
+    return sub_value
+
+
+def _order_json_keys_null(sub_value: JsonTypes, sub_schema: JsonTypes, schema: JsonTypes, path: List) -> None:
+    if sub_value is not None:
+        raise ValidationError(f'Type of {_fmt_path(path)} needs to be null (Python None).')
+    return sub_value
+
+
+def _order_json_keys(sub_value: JsonTypes, sub_schema: JsonTypes, schema: JsonTypes, path: List) -> JsonTypes:
+    if isinstance(sub_schema, bool):
+        if sub_schema:
+            return sub_value
+        raise ValidationError(f'Value {sub_value} not allowed in {_fmt_path(path)}.')
+    if isinstance(sub_schema, dict):
+        sub_schema = _resolve_ref(sub_schema, schema)
+    return {
+        'string': _order_json_keys_string,
+        'number': _order_json_keys_number,
+        'object': _order_json_keys_object,
+        'array': _order_json_keys_array,
+        'boolean': _order_json_keys_boolean,
+        'null': _order_json_keys_null,
+    }[sub_schema['type']](sub_value, sub_schema, schema, path)
+
+
+def order_json_keys(value: JsonTypes, schema: JsonTypes) -> JsonTypes:
+    return _order_json_keys(value, schema, schema, [])
index d522d8685246f9f8c3d1b8dd004155d6d375214e..5d560beba3d68ead296392abf4df533de9004957 100644 (file)
@@ -1,10 +1,10 @@
 """This module contains code to make the access of MediaWiki tables
 easy. The module uses sqlalchemy to access the database.
 """
 """This module contains code to make the access of MediaWiki tables
 easy. The module uses sqlalchemy to access the database.
 """
-from sqlalchemy import Table, Column, types
+from sqlalchemy import Table, Column, types, MetaData
 
 
 
 
-def page_table(metadata):
+def page_table(metadata: MetaData) -> Table:
     """Returns the sqlalchemy Table representing the "page" table in MediaWiki.
     :param metadata: metadata = sqlalchemy.MetaData()
     """
     """Returns the sqlalchemy Table representing the "page" table in MediaWiki.
     :param metadata: metadata = sqlalchemy.MetaData()
     """
@@ -18,15 +18,15 @@ def page_table(metadata):
         Column("page_is_new", types.Integer, nullable=False),
         Column("page_random", types.Float, nullable=False),
         Column("page_touched", types.String(14, convert_unicode='force'), nullable=False),
         Column("page_is_new", types.Integer, nullable=False),
         Column("page_random", types.Float, nullable=False),
         Column("page_touched", types.String(14, convert_unicode='force'), nullable=False),
+        Column("page_links_updated", types.String(14, convert_unicode='force')),
         Column("page_latest", types.Integer, nullable=False),
         Column("page_len", types.Integer, nullable=False),
         Column("page_content_model", types.String(32, convert_unicode='force')),
         Column("page_latest", types.Integer, nullable=False),
         Column("page_len", types.Integer, nullable=False),
         Column("page_content_model", types.String(32, convert_unicode='force')),
-        Column("page_links_updated", types.String(14, convert_unicode='force')),
         Column("page_lang", types.String(35, convert_unicode='force')),
     )
 
 
         Column("page_lang", types.String(35, convert_unicode='force')),
     )
 
 
-def revision_table(metadata):
+def revision_table(metadata: MetaData) -> Table:
     """Returns the sqlalchemy Table representing the "revision" table in MediaWiki.
     :param metadata: metadata = sqlalchemy.MetaData()
     """
     """Returns the sqlalchemy Table representing the "revision" table in MediaWiki.
     :param metadata: metadata = sqlalchemy.MetaData()
     """
@@ -34,19 +34,45 @@ def revision_table(metadata):
         "revision", metadata,
         Column("rev_id", types.Integer, nullable=False, primary_key=True),
         Column("rev_page", types.Integer, nullable=False, primary_key=True),
         "revision", metadata,
         Column("rev_id", types.Integer, nullable=False, primary_key=True),
         Column("rev_page", types.Integer, nullable=False, primary_key=True),
-        Column("rev_text_id", types.Integer, nullable=False),
-        Column("rev_comment", types.String(convert_unicode='force'), nullable=False),  # tinyblob NOT NULL
-        Column("rev_user", types.Integer, nullable=False),
-        Column("rev_user_text", types.String(255, convert_unicode='force'), nullable=False),
+        Column("rev_comment_id", types.Integer, nullable=False),
+        Column("rev_actor", types.Integer, nullable=False),
         Column("rev_timestamp", types.String(14, convert_unicode='force'), nullable=False),
         Column("rev_minor_edit", types.Integer, nullable=False),
         Column("rev_deleted", types.Integer, nullable=False),
         Column("rev_timestamp", types.String(14, convert_unicode='force'), nullable=False),
         Column("rev_minor_edit", types.Integer, nullable=False),
         Column("rev_deleted", types.Integer, nullable=False),
-        Column("rev_len", types.Integer, nullable=False),
-        Column("rev_parent_id", types.Integer, nullable=False),
+        Column("rev_len", types.Integer, nullable=True),
+        Column("rev_parent_id", types.Integer, nullable=True),
+        Column("rev_sha1", types.String(32), nullable=False),
+    )
+
+
+def slots_table(metadata: MetaData) -> Table:
+    """Returns the sqlalchemy Table representing the "slots" table in MediaWiki.
+    :param metadata: metadata = sqlalchemy.MetaData()
+    """
+    return Table(
+        "slots", metadata,
+        Column("slot_revision_id", types.Integer, nullable=False, primary_key=True),
+        Column("slot_role_id", types.Integer, nullable=False, primary_key=True),
+        Column("slot_content_id", types.Integer, nullable=False),
+        Column("slot_origin", types.Integer, nullable=False),
+    )
+
+
+def content_table(metadata: MetaData) -> Table:
+    """Returns the sqlalchemy Table representing the "content" table in MediaWiki.
+    :param metadata: metadata = sqlalchemy.MetaData()
+    """
+    return Table(
+        "content", metadata,
+        Column("content_id", types.Integer, nullable=False, primary_key=True),
+        Column("content_size", types.Integer, nullable=False),
+        Column("content_sha1", types.String(32, convert_unicode='force'), nullable=False),
+        Column("content_model", types.Integer, nullable=False),
+        Column("content_address", types.String(255, convert_unicode='force'), nullable=False),
     )
 
 
     )
 
 
-def text_table(metadata):
+def text_table(metadata: MetaData) -> Table:
     """Returns the sqlalchemy Table representing the "text" table in MediaWiki.
     :param metadata: metadata = sqlalchemy.MetaData()
     """
     """Returns the sqlalchemy Table representing the "text" table in MediaWiki.
     :param metadata: metadata = sqlalchemy.MetaData()
     """
@@ -58,7 +84,7 @@ def text_table(metadata):
     )
 
 
     )
 
 
-def user_table(metadata):
+def user_table(metadata: MetaData) -> Table:
     """Returns the sqlalchemy Table representing the "user" table in MediaWiki.
     :param metadata: metadata = sqlalchemy.MetaData()
     """
     """Returns the sqlalchemy Table representing the "user" table in MediaWiki.
     :param metadata: metadata = sqlalchemy.MetaData()
     """
@@ -81,7 +107,7 @@ def user_table(metadata):
     )
 
 
     )
 
 
-def user_groups_table(metadata):
+def user_groups_table(metadata: MetaData) -> Table:
     """
     Returns the sqlalchemy Table representing the "user_groups" table in MediaWiki.
 
     """
     Returns the sqlalchemy Table representing the "user_groups" table in MediaWiki.
 
@@ -95,7 +121,7 @@ def user_groups_table(metadata):
     )
 
 
     )
 
 
-def categorylinks_table(metadata):
+def categorylinks_table(metadata: MetaData) -> Table:
     """Returns the sqlalchemy Table representing the "categorylinks" table in MediaWiki.
     :param metadata: metadata = sqlalchemy.MetaData()
     """
     """Returns the sqlalchemy Table representing the "categorylinks" table in MediaWiki.
     :param metadata: metadata = sqlalchemy.MetaData()
     """
index a5d83ba5714c36f90f961dfb3198e4a76eec86e5..c14e96256e924ed5855a3a38a47ce40241207e92 100644 (file)
@@ -6,11 +6,27 @@ Other Python MediaWiki parsers:
 * mwlib http://code.pediapress.com/wiki/wiki
 * https://www.mediawiki.org/wiki/Alternative_parsers
 """
 * mwlib http://code.pediapress.com/wiki/wiki
 * https://www.mediawiki.org/wiki/Alternative_parsers
 """
-from typing import Optional
+from typing import Optional, Dict, List
 
 from mwparserfromhell.nodes import Template
 
 
 
 from mwparserfromhell.nodes import Template
 
 
+def create_template(name: str, args: List[str], kwargs: Optional[Dict[str, str]] = None) -> Template:
+    """Creates a mwparserfromhell template with from a dictionary (string: string)
+
+    :param name: Name of the template
+    :param args: list of unnamed parameters
+    :param kwargs: named parameters
+    """
+    template = Template(name)
+    for i, value in enumerate(args, 1):
+        template.add(str(i), value, False)
+    if kwargs is not None:
+        for key, value in kwargs.items():
+            template.add(key, value, True)
+    return template
+
+
 def format_template_table(template: Template, keylen: Optional[int] = None):
     """Reformat the given template to be tabular. The template is modified in-place
 
 def format_template_table(template: Template, keylen: Optional[int] = None):
     """Reformat the given template to be tabular. The template is modified in-place
 
diff --git a/wrpylib/templates/sledrun_wiki.txt b/wrpylib/templates/sledrun_wiki.txt
new file mode 100644 (file)
index 0000000..ddcf042
--- /dev/null
@@ -0,0 +1,104 @@
+{% block content %}
+{%- macro weblink(value) -%}
+{% if value.text %}[{{ value.url }} {{ value.text }}]{% else %}{{ value.url }}{% endif %}
+{%- endmacro %}
+{%- macro wr_page(value) -%}
+[[{{ value.title }}{% if value.name %}|{{ value.name }}{% endif %}]]
+{%- endmacro -%}
+== Allgemeines ==
+{{ rodelbahnbox }}
+{{ description | default("*Hier wird die Rodelbahn allgemein beschrieben.*") | from_markdown | trim }}
+{% raw %}
+* {{Position oben}}
+* {{Position unten}}
+* {{Höhenunterschied}}
+* {{Bahnlänge}}
+* {{Gehzeit}}
+{% endraw -%}
+* '''Beleuchtung''': {{ nightlight_description | default( nightlight_possible | default('*Unbekannt*') ) | from_markdown | trim }}
+* '''Rodelverleih''': {{ sled_rental_description | default( sled_rental_direct | german_bool | default( '*Unbekannt*' ) ) | from_markdown | trim }}
+* '''Schneelage - Auskunft''':
+{%- for info in info_phone %}
+** {{ info.phone }} ({{ info.name }})
+{%- endfor %}
+{%- for info in info_web %}
+** {{ weblink(info) }}
+{%- endfor %}
+* '''Betreiber''': {{ operator }}
+* '''Hütten''':
+{%- for info in gastronomy %}
+** {% if info.wr_page %}{{ wr_page(info.wr_page) }}{% endif %}
+{%- endfor %}
+* '''Andere Rodelbahnen''':
+{%- for info in sledrun_list %}
+** {{ wr_page(info) }}
+{% endfor -%}
+* '''Siehe auch''':
+{%- for info in see_also %}
+** {{ weblink(info) }}
+{%- endfor %}
+
+{% raw %}{{Buttonleiste{% endraw -%}
+|Bericht={{ allow_reports | default(true) | german_bool }}
+{%- if forum_id %}|ForumId={{ forum_id }}{% endif %}
+{%- if position %}|Wetter=Ja{% endif -%}
+|Korrektur=Seite{% if correction_email %}|Korrektur_To={{ correction_email }}{% endif -%}
+{% if freizeitticket_tyrol %}|Freizeitticket=Ja{% endif -%}
+{% if regio_card_tyrol %}|Regiocard=Ja{% endif -%}}}
+{% raw %}{{Clear}}{% endraw %}
+
+
+== Landkarte ==
+{% if map_json is none -%}
+''leider ist derzeit keine [[Winterrodeln:Landkarte|Landkarte]] zu dieser Rodelbahn vorhanden''
+{%- else -%}
+{{ h.create_wrmap(map_json) }}
+{% raw %}{{Landkarte Legende}}<br/>{{GoogleMaps Hinweis}}{% endraw %}
+{%- endif %}
+
+
+== Anreise mit öffentlichen Verkehrsmitteln ==
+{% if public_transport_description -%}
+{{ public_transport_description | from_markdown | trim }}
+{%- else -%}
+''Hier wird die Anreise mit öffentlichen Verkehrsmitteln beschrieben.''
+{%- endif %}
+
+{% for pt_stop in public_transport_stops -%}
+* {{ h.list_template('Haltestelle', [pt_stop.municipality, pt_stop.name_local, h.json_pos_ele_position(pt_stop.get('position', '')), h.json_pos_ele_elevation(pt_stop.get('position', ''))]) }}
+{% for template_name in ['monitor_template', 'route_arrival_template', 'route_departure_template'] -%}
+{% if template_name in pt_stop -%}
+** {{ h.json_template(pt_stop[template_name]) }}
+{% endif %}
+{%- endfor %}
+{%- endfor -%}
+{% for pt_line in public_transport_lines -%}
+{% if loop.first -%}
+* '''Fahrplan''':
+{%- endif %}
+** {% if pt_line.timetable_template is defined %}{{ h.json_template(pt_line.timetable_template) }}
+   {%- else %}{{ pt_line.name }}{% endif %}
+{%- endfor %}
+
+
+== Anreise mit dem Auto ==
+{% if car_description -%}
+{{ car_description | from_markdown | trim }}
+{%- else -%}
+''Hier wird die Anreise mit dem Auto beschrieben.''
+{%- endif %}
+
+{% for parking in sledrun_json.car_parking -%}
+* '''Parkplatz''': {{ h.list_template('Parkplatz', [h.json_pos_ele_position(parking.position), h.json_pos_ele_elevation(parking.position)]) }}
+{% endfor -%}
+{% for distance_info in sledrun_json.car_distances -%}
+{% if loop.first -%}
+* '''Entfernung''':
+{% endif -%}
+** {{ distance_info.route }}: {{ distance_info.km }} km
+{% endfor %}
+
+{% raw %}{{Rodelbahnzustand|Forumlink={% endraw %}{{ sledrun_json.forum_id }}}}
+
+[[Kategorie:Rodelbahn]]
+{% endblock %}
index f340d7a194ed4743185c452d7bf94925f2ad3318..ce0163fbf4a9420d6382110775f62b247ce24cb2 100644 (file)
@@ -1,10 +1,9 @@
 import json
 import math
 from abc import abstractmethod
 import json
 import math
 from abc import abstractmethod
-from collections import namedtuple
 from typing import Optional, Tuple
 
 from typing import Optional, Tuple
 
-import fiona.crs
+import fiona.crs  # pip install Fiona
 import fiona.transform
 import owslib.crs
 import owslib.wms
 import fiona.transform
 import owslib.crs
 import owslib.wms
index f8d9291a2e4e28078488307880e8a5ee43052eb7..538be78b9a704856fe4b9a388d209adb6cd0ff98 100644 (file)
@@ -1,5 +1,4 @@
 import datetime
 import datetime
-import json
 import re
 from sqlalchemy import orm
 from sqlalchemy.orm.session import Session
 import re
 from sqlalchemy import orm
 from sqlalchemy.orm.session import Session
index deceff8922b2cce00300479c605eca2702b0e19b..0b26415224aaa2cb52c8a84c0a5ed9f7b092ec3e 100644 (file)
@@ -1,5 +1,7 @@
 """Contains functions that maintain/update the cache tables."""
 """Contains functions that maintain/update the cache tables."""
-from sqlalchemy import schema
+import mwparserfromhell
+from sqlalchemy import schema, Table
+from sqlalchemy.engine import Connection
 from sqlalchemy.sql import select
 from sqlalchemy.sql.expression import func as sqlfunc
 from osgeo import ogr
 from sqlalchemy.sql import select
 from sqlalchemy.sql.expression import func as sqlfunc
 from osgeo import ogr
@@ -10,6 +12,16 @@ class UpdateCacheError(RuntimeError):
     pass
 
 
     pass
 
 
+def _get_mw_text(connection: Connection, text: Table, content_address: str) -> str:
+    parts = content_address.split(':')  # e.g. 'tt:15664'
+    if len(parts) != 2 or parts[0] != 'tt':
+        raise ValueError('Content has unexpected format')
+    old_id = int(parts[1])
+    query = select([text], text.c.old_id == old_id)
+    text_row = connection.execute(query).fetchone()
+    return text_row.old_text
+
+
 def update_wrsledruncache(connection):
     """Updates the wrsledruncache table from the wiki. If convert errors occur, an UpdateCacheError exception
     is raised. No other exception type should be raised under normal circumstances.
 def update_wrsledruncache(connection):
     """Updates the wrsledruncache table from the wiki. If convert errors occur, an UpdateCacheError exception
     is raised. No other exception type should be raised under normal circumstances.
@@ -22,48 +34,44 @@ def update_wrsledruncache(connection):
     wrsledruncache = wrmwdb.wrsledruncache_table(metadata)
     page = mwdb.page_table(metadata)
     categorylinks = mwdb.categorylinks_table(metadata)
     wrsledruncache = wrmwdb.wrsledruncache_table(metadata)
     page = mwdb.page_table(metadata)
     categorylinks = mwdb.categorylinks_table(metadata)
-    revision = mwdb.revision_table(metadata)
+    slots = mwdb.slots_table(metadata)
+    content = mwdb.content_table(metadata)
     text = mwdb.text_table(metadata)
 
     class Sledrun:
         pass
 
     text = mwdb.text_table(metadata)
 
     class Sledrun:
         pass
 
-    transaction = connection.begin()
-
-    # Query all sled runs
-    q = select(
-        [page, categorylinks, revision, text],
-        (page.c.page_latest == revision.c.rev_id) & (text.c.old_id == revision.c.rev_text_id) &
-        (categorylinks.c.cl_from == page.c.page_id) & (categorylinks.c.cl_to == 'Rodelbahn'))
-    sledrun_pages = connection.execute(q)
-    # Original SQL:
-    # sql = u"select page_id, rev_id, old_id, page_title, old_text, 'In_Arbeit' in
-    # (select cl_to from categorylinks where cl_from=page_id) as under_construction
-    # from page, revision, text, categorylinks where page_latest=rev_id and old_id=rev_text_id and
-    # cl_from=page_id and cl_to='Rodelbahn' order by page_title"
-    
-    # Delete all existing entries in wrsledruncache
-    # We rely on transactions MySQL InnoDB
-    connection.execute(wrsledruncache.delete())
-    
-    # Refill wrsledruncache table
-    for sledrun_page in sledrun_pages:
-        try:
-            rodelbahnbox = wrvalidators.rodelbahnbox_from_str(sledrun_page.old_text)
-            sledrun = wrmwmarkup.sledrun_from_rodelbahnbox(rodelbahnbox, Sledrun())
-            sledrun.page_id = sledrun_page.page_id
-            sledrun.page_title = sledrun_page.page_title
-            sledrun.name_url = wrvalidators.sledrun_page_title_to_pretty_url(sledrun_page.page_title)
-            sledrun.under_construction = connection.execute(select(
-                [sqlfunc.count()],
-                (categorylinks.c.cl_from == sledrun_page.page_id) &
-                (categorylinks.c.cl_to == 'In_Arbeit')).alias('x')).fetchone()[0] > 0
-            connection.execute(wrsledruncache.insert(sledrun.__dict__))
-        except ValueError as e:
-            transaction.rollback()
-            error_msg = f"Error at sled run '{sledrun_page.page_title}': {e}"
-            raise UpdateCacheError(error_msg, sledrun_page.page_title, e)
-    transaction.commit()
+    try:
+        with connection.begin():
+
+            # Query all sled runs
+            q = select(
+                [page, categorylinks, slots, content],
+                (page.c.page_latest == slots.c.slot_revision_id) & (slots.c.slot_content_id == content.c.content_id) &
+                (categorylinks.c.cl_from == page.c.page_id) & (categorylinks.c.cl_to == 'Rodelbahn'))
+            sledrun_pages = connection.execute(q)
+
+            # Delete all existing entries in wrsledruncache
+            # We rely on transactions MySQL InnoDB
+            connection.execute(wrsledruncache.delete())
+
+            # Refill wrsledruncache table
+            for sledrun_page in sledrun_pages:
+                old_text = _get_mw_text(connection, text, sledrun_page.content_address)
+                rodelbahnbox = wrvalidators.rodelbahnbox_from_str(old_text)
+                sledrun = wrmwmarkup.sledrun_from_rodelbahnbox(rodelbahnbox, Sledrun())
+                sledrun.page_id = sledrun_page.page_id
+                sledrun.page_title = sledrun_page.page_title
+                sledrun.name_url = wrvalidators.sledrun_page_title_to_pretty_url(sledrun_page.page_title)
+                sledrun.under_construction = connection.execute(select(
+                    [sqlfunc.count()],
+                    (categorylinks.c.cl_from == sledrun_page.page_id) &
+                    (categorylinks.c.cl_to == 'In_Arbeit')).alias('x')).fetchone()[0] > 0
+                connection.execute(wrsledruncache.insert(sledrun.__dict__))
+
+    except ValueError as e:
+        error_msg = f"Error at sled run '{sledrun_page.page_title}': {e}"
+        raise UpdateCacheError(error_msg, sledrun_page.page_title, e)
 
 
 def update_wrinncache(connection):
 
 
 def update_wrinncache(connection):
@@ -78,43 +86,44 @@ def update_wrinncache(connection):
     wrinncache = wrmwdb.wrinncache_table(metadata)
     page = mwdb.page_table(metadata)
     categorylinks = mwdb.categorylinks_table(metadata)
     wrinncache = wrmwdb.wrinncache_table(metadata)
     page = mwdb.page_table(metadata)
     categorylinks = mwdb.categorylinks_table(metadata)
-    revision = mwdb.revision_table(metadata)
+    slots = mwdb.slots_table(metadata)
+    content = mwdb.content_table(metadata)
     text = mwdb.text_table(metadata)
 
     class Inn:
         pass
 
     text = mwdb.text_table(metadata)
 
     class Inn:
         pass
 
-    transaction = connection.begin()
-
-    # Query all inns
-    q = select(
-        [page, categorylinks, revision, text],
-        (page.c.page_latest == revision.c.rev_id) & (text.c.old_id == revision.c.rev_text_id) &
-        (categorylinks.c.cl_from == page.c.page_id) & (categorylinks.c.cl_to == 'Gasthaus'))
-    inn_pages = connection.execute(q)
-        
-    # Delete all existing entries in wrinncache
-    # We rely on transactions MySQL InnoDB
-    connection.execute(wrinncache.delete())
-        
-    # Refill wrinncache table
-    for inn_page in inn_pages:
-        try:
-            gasthausbox = wrvalidators.gasthausbox_from_str(inn_page.old_text)
-            inn = wrmwmarkup.inn_from_gasthausbox(gasthausbox, Inn())
-            inn.page_id = inn_page.page_id
-            inn.page_title = inn_page.page_title
-            inn.under_construction = connection.execute(select(
-                [sqlfunc.count()],
-                (categorylinks.c.cl_from == inn_page.page_id) &
-                (categorylinks.c.cl_to == 'In_Arbeit')).alias('x')) \
-                .fetchone()[0] > 0  # it would be better to do this in the query above
-            connection.execute(wrinncache.insert(inn.__dict__))
-        except ValueError as e:
-            transaction.rollback()
-            error_msg = f"Error as inn '{inn_page.page_title}': {e}"
-            raise UpdateCacheError(error_msg, inn_page.page_title, e)
-    transaction.commit()
+    try:
+        with connection.begin():
+
+            # Query all inns
+            q = select(
+                [page, categorylinks, slots, content],
+                (page.c.page_latest == slots.c.slot_revision_id) & (slots.c.slot_content_id == content.c.content_id) &
+                (categorylinks.c.cl_from == page.c.page_id) & (categorylinks.c.cl_to == 'Gasthaus'))
+            inn_pages = connection.execute(q)
+
+            # Delete all existing entries in wrinncache
+            # We rely on transactions MySQL InnoDB
+            connection.execute(wrinncache.delete())
+
+            # Refill wrinncache table
+            for inn_page in inn_pages:
+                old_text = _get_mw_text(connection, text, inn_page.content_address)
+                gasthausbox = wrvalidators.gasthausbox_from_str(old_text)
+                inn = wrmwmarkup.inn_from_gasthausbox(gasthausbox, Inn())
+                inn.page_id = inn_page.page_id
+                inn.page_title = inn_page.page_title
+                inn.under_construction = connection.execute(select(
+                    [sqlfunc.count()],
+                    (categorylinks.c.cl_from == inn_page.page_id) &
+                    (categorylinks.c.cl_to == 'In_Arbeit')).alias('x')) \
+                    .fetchone()[0] > 0  # it would be better to do this in the query above
+                connection.execute(wrinncache.insert(inn.__dict__))
+
+    except ValueError as e:
+        error_msg = f"Error as inn '{inn_page.page_title}': {e}"
+        raise UpdateCacheError(error_msg, inn_page.page_title, e)
 
 
 def update_wrreportcache(connection, page_id=None):
 
 
 def update_wrreportcache(connection, page_id=None):
@@ -129,38 +138,37 @@ def update_wrreportcache(connection, page_id=None):
     """
     metadata = schema.MetaData()
     wrreportcache = wrmwdb.wrreportcache_table(metadata)
     """
     metadata = schema.MetaData()
     wrreportcache = wrmwdb.wrreportcache_table(metadata)
-    transaction = connection.begin()
-
-    # Delete the datasets we are going to update
-    sql_del = wrreportcache.delete()
-    if page_id is not None:
-        sql_del = sql_del.where(wrreportcache.c.page_id == page_id)
-    connection.execute(sql_del)
-
-    def insert_row(connection_, row_list_):
-        if len(row_list_) == 0:
-            return
-        # Insert the report
-        row_ = dict(row_list_[0])
-        connection_.execute(wrreportcache.insert(values=row_))
-
-    # Select the rows to update
-    sql = 'select page_id, page_title, wrreport.id as report_id, date_report, `condition`, description, author_name, ' \
-          'if(author_userid is null, null, author_username) as author_username from wrreport ' \
-          'where {0}`condition` is not null and date_invalid > now() and delete_date is null ' \
-          'order by page_id, date_report desc, date_entry desc' \
-          .format('' if page_id is None else f'page_id={page_id} and ')
-    cursor = connection.execute(sql)
-    page_id = None
-    row_list = []
-    for row in cursor:
-        if row.page_id != page_id:
-            insert_row(connection, row_list)
-            page_id = row.page_id
-            row_list = []
-        row_list.append(row)
-    insert_row(connection, row_list)
-    transaction.commit()
+    with connection.begin():
+        # Delete the datasets we are going to update
+        sql_del = wrreportcache.delete()
+        if page_id is not None:
+            sql_del = sql_del.where(wrreportcache.c.page_id == page_id)
+        connection.execute(sql_del)
+
+        def insert_row(connection_, row_list_):
+            if len(row_list_) == 0:
+                return
+            # Insert the report
+            row_ = dict(row_list_[0])
+            connection_.execute(wrreportcache.insert(values=row_))
+
+        # Select the rows to update
+        sql = 'select page_id, page_title, wrreport.id as report_id, date_report, `condition`, description, ' \
+              'author_name, ' \
+              'if(author_userid is null, null, author_username) as author_username from wrreport ' \
+              'where {0}`condition` is not null and date_invalid > now() and delete_date is null ' \
+              'order by page_id, date_report desc, date_entry desc' \
+              .format('' if page_id is None else f'page_id={page_id} and ')
+        cursor = connection.execute(sql)
+        page_id = None
+        row_list = []
+        for row in cursor:
+            if row.page_id != page_id:
+                insert_row(connection, row_list)
+                page_id = row.page_id
+                row_list = []
+            row_list.append(row)
+        insert_row(connection, row_list)
 
 
 def update_wrmapcache(connection):
 
 
 def update_wrmapcache(connection):
@@ -176,87 +184,81 @@ def update_wrmapcache(connection):
     metadata = schema.MetaData()
     page = mwdb.page_table(metadata)
     categorylinks = mwdb.categorylinks_table(metadata)
     metadata = schema.MetaData()
     page = mwdb.page_table(metadata)
     categorylinks = mwdb.categorylinks_table(metadata)
-    revision = mwdb.revision_table(metadata)
+    slots = mwdb.slots_table(metadata)
+    content = mwdb.content_table(metadata)
     text = mwdb.text_table(metadata)
 
     text = mwdb.text_table(metadata)
 
-    transaction = connection.begin()
-
-    # Query all sledruns
-    q = select(
-        [page, categorylinks, revision, text],
-        (page.c.page_latest == revision.c.rev_id) & (text.c.old_id == revision.c.rev_text_id) &
-        (categorylinks.c.cl_from == page.c.page_id) & (categorylinks.c.cl_to == 'Rodelbahn'))
-    sledrun_pages = connection.execute(q)
-    # Original SQL:
-    # sql = u"select page_id, rev_id, old_id, page_title, old_text, 'In_Arbeit' in
-    # (select cl_to from categorylinks where cl_from=page_id) as under_construction
-    # from page, revision, text, categorylinks where page_latest=rev_id and old_id=rev_text_id and cl_from=page_id
-    # and cl_to='Rodelbahn' order by page_title"
-    
-    # Delete all existing entries in wrmappointcache
-    # We rely on transactions MySQL InnoDB
-    connection.execute('delete from wrmappointcache')
-    connection.execute('delete from wrmappathcache')
-    
-    # Refill wrmappointcache and wrmappathcache tables
-    for sledrun_page in sledrun_pages:
-        try:
-            import mwparserfromhell
-            wikicode = mwparserfromhell.parse(sledrun_page.old_text)
-            wrmap_list = wikicode.filter_tags(recursive=False, matches=lambda tag: tag.tag == 'wrmap')
-            if len(wrmap_list) == 0:
-                continue  # not wrmap in page
-            if len(wrmap_list) > 1:
-                raise UpdateCacheError(
-                    f'{len(wrmap_list)} <wrmap ...> entries found in article "{sledrun_page.page_title}"')
-            wrmap = wrmap_list[0]
-            geojson = wrmwmarkup.parse_wrmap(str(wrmap))
-
-            for feature in geojson['features']:
-                properties = feature['properties']
-                coordinates = feature['geometry']['coordinates']
-
-                # Points
-                if properties['type'] in wrmwmarkup.WRMAP_POINT_TYPES:
-                    lon, lat = coordinates
-                    label = properties.get('name')
-                    point_types = {
-                        'gasthaus': 'hut',
-                        'haltestelle': 'busstop',
-                        'parkplatz': 'carpark',
-                        'achtung': 'warning',
-                        'foto': 'photo',
-                        'verleih': 'rental',
-                        'punkt': 'point'
-                    }
-                    point_type = point_types[properties['type']]
-                    sql = 'insert into wrmappointcache (page_id, type, point, label) values (%s, %s, POINT(%s, %s), %s)'
-                    connection.execute(sql, (sledrun_page.page_id, point_type, lon, lat, label))
-
-                # Paths
-                elif properties['type'] in wrmwmarkup.WRMAP_LINE_TYPES:
-                    path_types = {
-                        'rodelbahn': 'sledrun',
-                        'gehweg': 'walkup',
-                        'alternative': 'alternative',
-                        'lift': 'lift',
-                        'anfahrt': 'recommendedcarroute',
-                        'linie': 'line'}
-                    path_type = path_types[properties['type']]
-                    path = ", ".join([f"{lon} {lat}" for lon, lat in coordinates])
-                    path = f'LineString({path})'
-                    if path_type == 'recommendedcarroute':
-                        continue
-                    sql = 'insert into wrmappathcache (path, page_id, type) values (GeomFromText(%s), %s, %s)'
-                    connection.execute(sql, (path, sledrun_page.page_id, path_type))
-
-                else:
-                    raise RuntimeError(f'Unknown feature type {properties["type"]}')
-        except RuntimeError as e:
-            error_msg = f"Error at sledrun '{sledrun_page.page_title}': {e}"
-            transaction.rollback()
-            raise UpdateCacheError(error_msg, sledrun_page.page_title, e)
-    transaction.commit()
+    try:
+        with connection.begin():
+
+            # Query all sledruns
+            q = select(
+                [page, categorylinks, slots, content],
+                (page.c.page_latest == slots.c.slot_revision_id) & (slots.c.slot_content_id == content.c.content_id) &
+                (categorylinks.c.cl_from == page.c.page_id) & (categorylinks.c.cl_to == 'Rodelbahn'))
+            sledrun_pages = connection.execute(q)
+
+            # Delete all existing entries in wrmappointcache
+            # We rely on transactions MySQL InnoDB
+            connection.execute('delete from wrmappointcache')
+            connection.execute('delete from wrmappathcache')
+
+            # Refill wrmappointcache and wrmappathcache tables
+            for sledrun_page in sledrun_pages:
+                old_text = _get_mw_text(connection, text, sledrun_page.content_address)
+                wikicode = mwparserfromhell.parse(old_text)
+                wrmap_list = wikicode.filter_tags(recursive=False, matches=lambda tag: tag.tag == 'wrmap')
+                if len(wrmap_list) == 0:
+                    continue  # not wrmap in page
+                if len(wrmap_list) > 1:
+                    raise UpdateCacheError(
+                        f'{len(wrmap_list)} <wrmap ...> entries found in article "{sledrun_page.page_title}"')
+                wrmap = wrmap_list[0]
+                geojson = wrmwmarkup.parse_wrmap(str(wrmap))
+
+                for feature in geojson['features']:
+                    properties = feature['properties']
+                    coordinates = feature['geometry']['coordinates']
+
+                    # Points
+                    if properties['type'] in wrmwmarkup.WRMAP_POINT_TYPES:
+                        lon, lat = coordinates
+                        label = properties.get('name')
+                        point_types = {
+                            'gasthaus': 'hut',
+                            'haltestelle': 'busstop',
+                            'parkplatz': 'carpark',
+                            'achtung': 'warning',
+                            'foto': 'photo',
+                            'verleih': 'rental',
+                            'punkt': 'point'
+                        }
+                        point_type = point_types[properties['type']]
+                        sql = 'insert into wrmappointcache (page_id, type, point, label) values (%s, %s, POINT(%s, %s), %s)'
+                        connection.execute(sql, (sledrun_page.page_id, point_type, lon, lat, label))
+
+                    # Paths
+                    elif properties['type'] in wrmwmarkup.WRMAP_LINE_TYPES:
+                        path_types = {
+                            'rodelbahn': 'sledrun',
+                            'gehweg': 'walkup',
+                            'alternative': 'alternative',
+                            'lift': 'lift',
+                            'anfahrt': 'recommendedcarroute',
+                            'linie': 'line'}
+                        path_type = path_types[properties['type']]
+                        path = ", ".join([f"{lon} {lat}" for lon, lat in coordinates])
+                        path = f'LineString({path})'
+                        if path_type == 'recommendedcarroute':
+                            continue
+                        sql = 'insert into wrmappathcache (path, page_id, type) values (GeomFromText(%s), %s, %s)'
+                        connection.execute(sql, (path, sledrun_page.page_id, path_type))
+
+                    else:
+                        raise RuntimeError(f'Unknown feature type {properties["type"]}')
+    except RuntimeError as e:
+        error_msg = f"Error at sledrun '{sledrun_page.page_title}': {e}"
+        raise UpdateCacheError(error_msg, sledrun_page.page_title, e)
 
 
 def update_wrregioncache(connection):
 
 
 def update_wrregioncache(connection):
@@ -275,35 +277,32 @@ def update_wrregioncache(connection):
     wrsledruncache = wrmwdb.wrsledruncache_table(metadata)
     wrregioncache = wrmwdb.wrregioncache_table(metadata)
 
     wrsledruncache = wrmwdb.wrsledruncache_table(metadata)
     wrregioncache = wrmwdb.wrregioncache_table(metadata)
 
-    transaction = connection.begin()
-
-    # Delete all existing entries in wrregioncache
-    # We rely on transactions MySQL InnoDB
-    connection.execute(wrregioncache.delete())
-    
-    # Query all combinations of sledruns and regions
-    sel = select(
-        [
-            wrregion.c.id.label('region_id'),
-            sqlfunc.AsWKB(wrregion.c.border).label('border'),
-            wrsledruncache.c.page_id,
-            wrsledruncache.c.position_longitude,
-            wrsledruncache.c.position_latitude
-        ],
-        sqlfunc.contains(
-            wrregion.c.border,
-            sqlfunc.point(wrsledruncache.c.position_longitude, wrsledruncache.c.position_latitude)
+    with connection.begin():
+
+        # Delete all existing entries in wrregioncache
+        # We rely on transactions MySQL InnoDB
+        connection.execute(wrregioncache.delete())
+
+        # Query all combinations of sledruns and regions
+        sel = select(
+            [
+                wrregion.c.id.label('region_id'),
+                sqlfunc.AsWKB(wrregion.c.border).label('border'),
+                wrsledruncache.c.page_id,
+                wrsledruncache.c.position_longitude,
+                wrsledruncache.c.position_latitude
+            ],
+            sqlfunc.contains(
+                wrregion.c.border,
+                sqlfunc.point(wrsledruncache.c.position_longitude, wrsledruncache.c.position_latitude)
+            )
         )
         )
-    )
-    ins = wrregioncache.insert()
-
-    # Refill wrregioncache
-    point = ogr.Geometry(ogr.wkbPoint)
-    result = connection.execute(sel)
-    for row in result:
-        point.SetPoint(0, row.position_longitude, row.position_latitude)
-        if point.Within(ogr.CreateGeometryFromWkb(row.border)):
-            connection.execute(ins.values(region_id=row.region_id, page_id=row.page_id))
-
-    # commit
-    transaction.commit()
+        ins = wrregioncache.insert()
+
+        # Refill wrregioncache
+        point = ogr.Geometry(ogr.wkbPoint)
+        result = connection.execute(sel)
+        for row in result:
+            point.SetPoint(0, row.position_longitude, row.position_latitude)
+            if point.Within(ogr.CreateGeometryFromWkb(row.border)):
+                connection.execute(ins.values(region_id=row.region_id, page_id=row.page_id))
index cd44b719dd87e31ae04bb1c4ff707ae3aa90df42..e0d8dd185e7ad658a1e77dd4b06f804ac49eea1b 100644 (file)
@@ -43,7 +43,7 @@ def wrreport_table(metadata):
     )
 
 
     )
 
 
-def wrsledruncache_table(metadata):
+def wrsledruncache_table(metadata) -> Table:
     """Returns the sqlalchemy Table representing the "wrsledruncache" Winterrodeln table in MediaWiki.
     Current table definition
     * version 1.4 (renamed table and added column walkup_possible)
     """Returns the sqlalchemy Table representing the "wrsledruncache" Winterrodeln table in MediaWiki.
     Current table definition
     * version 1.4 (renamed table and added column walkup_possible)
index f2ea5c871ec7cee1f21a05fd42d98ba59c68004d..5ef604c6f6f168b739aa9039b989c0d976adcd09 100644 (file)
@@ -1,10 +1,12 @@
 """This module contains winterrodeln specific functions that are processing the MediaWiki markup.
 """
 import re
 """This module contains winterrodeln specific functions that are processing the MediaWiki markup.
 """
 import re
+import subprocess
 import xml.etree.ElementTree
 import collections
 import xml.etree.ElementTree
 import collections
-from typing import Tuple, Optional, List
+from typing import Tuple, Optional, List, OrderedDict, Union, Dict
 
 
+import jinja2
 from mwparserfromhell.nodes import Template
 
 import wrpylib.wrvalidators
 from mwparserfromhell.nodes import Template
 
 import wrpylib.wrvalidators
@@ -12,25 +14,40 @@ import wrpylib.mwmarkup
 import wrpylib.wrmwdb
 from wrpylib.wrvalidators import LonLat, opt_lonlat_from_str, opt_lonlat_to_str, opt_uint_from_str, opt_uint_to_str, \
     opt_str_opt_comment_enum_to_str, lift_german_to_str, webauskunft_to_str, cachet_german_to_str, \
 import wrpylib.wrmwdb
 from wrpylib.wrvalidators import LonLat, opt_lonlat_from_str, opt_lonlat_to_str, opt_uint_from_str, opt_uint_to_str, \
     opt_str_opt_comment_enum_to_str, lift_german_to_str, webauskunft_to_str, cachet_german_to_str, \
-    opt_phone_comment_enum_to_str, lift_german_from_str, GASTHAUSBOX_DICT
+    opt_phone_comment_enum_to_str, lift_german_from_str, GASTHAUSBOX_DICT, opt_difficulty_german_from_str, \
+    opt_avalanches_german_from_str, nightlightdays_from_str, opt_public_transport_german_from_str, \
+    opt_tristate_german_comment_from_str, rodelbahnbox_to_str, lonlat_to_str, opt_no_or_str_to_str, \
+    opt_no_or_str_from_str
 
 
 
 
-def sledrun_from_rodelbahnbox(value, sledrun):
+def split_lon_lat(value: Optional[LonLat]) -> Union[LonLat, Tuple[None, None]]:
+    if value is None:
+        return None, None
+    return value
+
+
+def join_lon_lat(lon: Optional[float], lat: Optional[float]) -> Optional[LonLat]:
+    if lon is None or lat is None:
+        return None
+    return LonLat(lon, lat)
+
+
+def sledrun_from_rodelbahnbox(value: OrderedDict, sledrun: object):
     """Takes a Rodelbahnbox as returned by rodelbahnbox_from_str (that is, an OrderedDict) and
     updates the sledrun instance with all values present in the Rodelbahnbox. Other values are not
     updated. Does not validate the arguments."""
     # sledrun.page_id = None # this field is not updated because it is not present in the RodelbahnBox
     # sledrun.page_title = None # this field is not updated because it is not present in the RodelbahnBox
     # sledrun.name_url = None # this field is not updated because it is not present in the RodelbahnBox
     """Takes a Rodelbahnbox as returned by rodelbahnbox_from_str (that is, an OrderedDict) and
     updates the sledrun instance with all values present in the Rodelbahnbox. Other values are not
     updated. Does not validate the arguments."""
     # sledrun.page_id = None # this field is not updated because it is not present in the RodelbahnBox
     # sledrun.page_title = None # this field is not updated because it is not present in the RodelbahnBox
     # sledrun.name_url = None # this field is not updated because it is not present in the RodelbahnBox
-    sledrun.position_longitude, sledrun.position_latitude = value['Position']
-    sledrun.top_longitude, sledrun.top_latitude = value['Position oben']
+    sledrun.position_longitude, sledrun.position_latitude = split_lon_lat(value['Position'])
+    sledrun.top_longitude, sledrun.top_latitude = split_lon_lat(value['Position oben'])
     sledrun.top_elevation = value['Höhe oben']
     sledrun.top_elevation = value['Höhe oben']
-    sledrun.bottom_longitude, sledrun.bottom_latitude = value['Position unten']
+    sledrun.bottom_longitude, sledrun.bottom_latitude = split_lon_lat(value['Position unten'])
     sledrun.bottom_elevation = value['Höhe unten']
     sledrun.length = value['Länge']
     sledrun.difficulty = value['Schwierigkeit']
     sledrun.avalanches = value['Lawinen']
     sledrun.bottom_elevation = value['Höhe unten']
     sledrun.length = value['Länge']
     sledrun.difficulty = value['Schwierigkeit']
     sledrun.avalanches = value['Lawinen']
-    sledrun.operator = value['Betreiber']
+    sledrun.operator = opt_no_or_str_to_str(value['Betreiber'])
     sledrun.public_transport = value['Öffentliche Anreise']
     sledrun.walkup_possible = value['Aufstieg möglich']
     sledrun.walkup_time = value['Gehzeit']
     sledrun.public_transport = value['Öffentliche Anreise']
     sledrun.walkup_possible = value['Aufstieg möglich']
     sledrun.walkup_time = value['Gehzeit']
@@ -55,15 +72,15 @@ def sledrun_to_rodelbahnbox(sledrun) -> collections.OrderedDict:
     """Takes a sledrun instance that might come from the database and converts it to a OrderedDict ready
     to be formatted as RodelbahnBox."""
     value = collections.OrderedDict()
     """Takes a sledrun instance that might come from the database and converts it to a OrderedDict ready
     to be formatted as RodelbahnBox."""
     value = collections.OrderedDict()
-    value['Position'] = LonLat(sledrun.position_longitude, sledrun.position_latitude)
-    value['Position oben'] = LonLat(sledrun.top_longitude, sledrun.top_latitude)
+    value['Position'] = join_lon_lat(sledrun.position_longitude, sledrun.position_latitude)
+    value['Position oben'] = join_lon_lat(sledrun.top_longitude, sledrun.top_latitude)
     value['Höhe oben'] = sledrun.top_elevation
     value['Höhe oben'] = sledrun.top_elevation
-    value['Position unten'] = LonLat(sledrun.bottom_longitude, sledrun.bottom_latitude)
+    value['Position unten'] = join_lon_lat(sledrun.bottom_longitude, sledrun.bottom_latitude)
     value['Höhe unten'] = sledrun.bottom_elevation
     value['Länge'] = sledrun.length
     value['Schwierigkeit'] = sledrun.difficulty
     value['Lawinen'] = sledrun.avalanches
     value['Höhe unten'] = sledrun.bottom_elevation
     value['Länge'] = sledrun.length
     value['Schwierigkeit'] = sledrun.difficulty
     value['Lawinen'] = sledrun.avalanches
-    value['Betreiber'] = sledrun.operator
+    value['Betreiber'] = opt_no_or_str_from_str(sledrun.operator)
     value['Öffentliche Anreise'] = sledrun.public_transport
     value['Aufstieg möglich'] = sledrun.walkup_possible
     value['Gehzeit'] = sledrun.walkup_time
     value['Öffentliche Anreise'] = sledrun.public_transport
     value['Aufstieg möglich'] = sledrun.walkup_possible
     value['Gehzeit'] = sledrun.walkup_time
@@ -91,7 +108,7 @@ def inn_from_gasthausbox(value, inn):
         if v == '':
             return None
         return v
         if v == '':
             return None
         return v
-    inn.position_longitude, inn.position_latitude = value['Position']
+    inn.position_longitude, inn.position_latitude = split_lon_lat(value['Position'])
     inn.position_elevation = value['Höhe']
     inn.operator = value['Betreiber']
     inn.seats = value['Sitzplätze']
     inn.position_elevation = value['Höhe']
     inn.operator = value['Betreiber']
     inn.seats = value['Sitzplätze']
@@ -112,10 +129,10 @@ def inn_from_gasthausbox(value, inn):
 def inn_to_gasthausbox(inn) -> collections.OrderedDict:
     """Converts an inn class to a dict of Gasthausbox properties. inn is an Inn instance."""
     def convfromdb(val, key):
 def inn_to_gasthausbox(inn) -> collections.OrderedDict:
     """Converts an inn class to a dict of Gasthausbox properties. inn is an Inn instance."""
     def convfromdb(val, key):
-        v = '' if value is None else val
+        v = '' if val is None else val
         return GASTHAUSBOX_DICT[key].from_str(v)
     value = collections.OrderedDict()
         return GASTHAUSBOX_DICT[key].from_str(v)
     value = collections.OrderedDict()
-    value['Position'] = LonLat(inn.position_longitude, inn.position_latitude)
+    value['Position'] = join_lon_lat(inn.position_longitude, inn.position_latitude)
     value['Höhe'] = inn.position_elevation
     value['Betreiber'] = inn.operator
     value['Sitzplätze'] = inn.seats
     value['Höhe'] = inn.position_elevation
     value['Betreiber'] = inn.operator
     value['Sitzplätze'] = inn.seats
@@ -132,14 +149,14 @@ def inn_to_gasthausbox(inn) -> collections.OrderedDict:
     return value
 
 
     return value
 
 
-def lonlat_ele_from_template(template) -> Tuple[LonLat, Optional[int]]:
+def lonlat_ele_from_template(template) -> Tuple[Optional[LonLat], Optional[int]]:
     """Template is a `mwparserfromhell.nodes.template.Template` instance. Returns (lonlat, ele)."""
     lonlat = opt_lonlat_from_str(template.params[0].strip())
     ele = opt_uint_from_str(template.params[1].strip())
     return lonlat, ele
 
 
     """Template is a `mwparserfromhell.nodes.template.Template` instance. Returns (lonlat, ele)."""
     lonlat = opt_lonlat_from_str(template.params[0].strip())
     ele = opt_uint_from_str(template.params[1].strip())
     return lonlat, ele
 
 
-def latlon_ele_to_template(lonlat_ele, name) -> Template:
+def latlon_ele_to_template(lonlat_ele: Tuple[Optional[LonLat], Optional[int]], name: str) -> Template:
     lonlat, ele = lonlat_ele
     template = Template(name)
     template.add(1, opt_lonlat_to_str(lonlat))
     lonlat, ele = lonlat_ele
     template = Template(name)
     template.add(1, opt_lonlat_to_str(lonlat))
@@ -148,6 +165,19 @@ def latlon_ele_to_template(lonlat_ele, name) -> Template:
     return template
 
 
     return template
 
 
+def lonlat_to_json(lonlat: LonLat) -> dict:
+    return {'longitude': lonlat.lon, 'latitude': lonlat.lat}
+
+
+def lonlat_ele_to_json(lonlat: Optional[LonLat], ele: Optional[int]) -> dict:
+    result = {}
+    if lonlat is not None:
+        result['position'] = lonlat_to_json(lonlat)
+    if ele is not None:
+        result['elevation'] = ele
+    return result
+
+
 class ParseError(RuntimeError):
     """Exception used by some of the functions"""
     pass
 class ParseError(RuntimeError):
     """Exception used by some of the functions"""
     pass
@@ -285,7 +315,7 @@ def create_wrmap_coordinates(coords):
     return '\n'.join(result)
  
 
     return '\n'.join(result)
  
 
-def create_wrmap(geojson):
+def create_wrmap(geojson: Dict) -> str:
     """Creates a <wrmap> wikitext from geojson (as python types)."""
     wrmap_xml = xml.etree.ElementTree.Element('wrmap')
     wrmap_xml.text = '\n\n'
     """Creates a <wrmap> wikitext from geojson (as python types)."""
     wrmap_xml = xml.etree.ElementTree.Element('wrmap')
     wrmap_xml.text = '\n\n'
@@ -316,3 +346,141 @@ def create_wrmap(geojson):
     if last_json_feature is not None:
         last_json_feature.tail = '\n\n'
     return xml.etree.ElementTree.tostring(wrmap_xml, encoding='utf-8').decode('utf-8')
     if last_json_feature is not None:
         last_json_feature.tail = '\n\n'
     return xml.etree.ElementTree.tostring(wrmap_xml, encoding='utf-8').decode('utf-8')
+
+
+def markdown_to_mediawiki(markdown: str) -> str:
+    return subprocess.check_output(['pandoc', '--to', 'mediawiki'], input=markdown, encoding='utf-8')
+
+
+def german_bool(value: Union[bool, jinja2.Undefined]) -> Union[str, jinja2.Undefined]:
+    if jinja2.is_undefined(value):
+        return value
+    return wrpylib.wrvalidators.bool_german_to_str(value)
+
+
+class Jinja2Tools:
+    def create_wrmap(self, geojson: Dict) -> str:
+        return create_wrmap(geojson)
+
+    def json_position(self, value: dict) -> str:
+        lon_lat = LonLat(value['longitude'], value['latitude'])
+        return lonlat_to_str(lon_lat)
+
+    def json_pos_ele_position(self, value: dict) -> str:
+        pos = value.get('position')
+        if pos is None:
+            return ''
+        return self.json_position(pos)
+
+    def json_pos_ele_elevation(self, value: dict) -> str:
+        return value.get('elevation', '')
+
+    def list_template(self, name: str, value: List[str]) -> str:
+        return str(wrpylib.mwmarkup.create_template(name, value))
+
+    def json_template(self, value) -> str:
+        args = []
+        kwargs = {}
+        for p in value.get('parameter', []):
+            v = p.get('value', '')
+            if 'key' in p:
+                kwargs[p['key']] = v
+            else:
+                args.append(v)
+        return str(wrpylib.mwmarkup.create_template(value['name'], args, kwargs))
+
+
+def create_sledrun_wiki(sledrun_json: Dict, map_json: Optional[Dict]) -> str:
+    env = jinja2.Environment(
+        loader=jinja2.PackageLoader("wrpylib"),
+        autoescape=jinja2.select_autoescape(),
+    )
+    env.filters["from_markdown"] = markdown_to_mediawiki
+    env.filters["german_bool"] = german_bool
+    template = env.get_template("sledrun_wiki.txt")
+
+    def position_to_lon_lat(value: Optional[dict]) -> Optional[LonLat]:
+        if value is not None:
+            lon = value.get('longitude')
+            lat = value.get('latitude')
+            if lon is not None and lat is not None:
+                return LonLat(lon, lat)
+        return None
+
+    def position_ele_to_lon_lat(value: Optional[dict]) -> Optional[LonLat]:
+        if value is not None:
+            return position_to_lon_lat(value.get("position"))
+        return None
+
+    def position_ele_to_ele(value: Optional[dict]) -> Optional[int]:
+        if value is not None:
+            ele = value.get('elevation')
+            if ele is not None:
+                return int(ele)
+        return None
+
+    def aufstiegshilfe() -> Optional[List[Tuple[str, Optional[str]]]]:
+        ws = sledrun_json.get('walkup_supports')
+        if ws is None:
+            return None
+        return [(w['type'], w.get('comment')) for w in ws]
+
+    def rodelverleih() -> Optional[List[Tuple[str, Optional[str]]]]:
+        sr = sledrun_json.get('sled_rental_direct')
+        if sr is None:
+            return None
+        return [('Ja', None)] if sr else []
+
+    def webauskunft() -> Tuple[Optional[bool], Optional[str]]:
+        info_web = sledrun_json.get('info_web')
+        if info_web is None:
+            return None, None
+        if len(info_web) == 0:
+            return False, None
+        return True, info_web[0]['url']
+
+    def telefonauskunft() -> Optional[List[Tuple[str, str]]]:
+        info_phone = sledrun_json.get('info_phone')
+        if info_phone is None:
+            return None
+        return [(pc['phone'], pc['name']) for pc in info_phone]
+
+    def betreiber() -> str:
+        has_operator = sledrun_json.get('has_operator')
+        if has_operator is None:
+            return sledrun_json.get('operator')
+        if has_operator:
+            return sledrun_json.get('operator')
+        return 'Nein'
+
+    sledrun_rbb_json = collections.OrderedDict([
+        ('Position', position_to_lon_lat(sledrun_json.get('position'))),
+        ('Position oben', position_ele_to_lon_lat(sledrun_json.get('top'))),
+        ('Höhe oben', position_ele_to_ele(sledrun_json.get('top'))),
+        ('Position unten', position_ele_to_lon_lat(sledrun_json.get('bottom'))),
+        ('Höhe unten', position_ele_to_ele(sledrun_json.get('bottom'))),
+        ('Länge', sledrun_json.get('length')),
+        ('Schwierigkeit', opt_difficulty_german_from_str(sledrun_json.get('difficulty', ''))),
+        ('Lawinen', opt_avalanches_german_from_str(sledrun_json.get('avalanches', ''))),
+        ('Betreiber', (sledrun_json.get('has_operator'), sledrun_json.get('operator'))),
+        ('Öffentliche Anreise', opt_public_transport_german_from_str(sledrun_json.get('public_transport', ''))),
+        ('Aufstieg möglich', sledrun_json.get('walkup_possible')),
+        ('Aufstieg getrennt', opt_tristate_german_comment_from_str(sledrun_json.get('walkup_separate', ''))),
+        ('Gehzeit', sledrun_json.get('walkup_time')),
+        ('Aufstiegshilfe', aufstiegshilfe()),
+        ('Beleuchtungsanlage', opt_tristate_german_comment_from_str(sledrun_json.get('nightlight_possible', ''))),
+        ('Beleuchtungstage', nightlightdays_from_str(sledrun_json.get('nightlight_weekdays', ''))),
+        ('Rodelverleih', rodelverleih()),
+        ('Gütesiegel', None),
+        ('Webauskunft', webauskunft()),
+        ('Telefonauskunft', telefonauskunft()),
+        ('Bild', sledrun_json.get('image')),
+        ('In Übersichtskarte', sledrun_json.get('show_in_overview')),
+        ('Forumid', sledrun_json.get('forum_id'))
+    ])
+
+    rodelbahnbox = rodelbahnbox_to_str(sledrun_rbb_json)
+
+    return template.render(sledrun_json=sledrun_json,
+                           rodelbahnbox=rodelbahnbox,
+                           map_json=map_json, h=Jinja2Tools(), **sledrun_json)
index f48bb30f5db1d03bf9549f5b7a3bd1bebe140747..61b141568a29a4fc78af906086848463d18db1a7 100644 (file)
@@ -15,6 +15,7 @@ from email.errors import HeaderParseError
 from typing import Tuple, Optional, List, Callable, Union, TypeVar, Dict, NamedTuple
 
 import mwparserfromhell  # https://github.com/earwig/mwparserfromhell
 from typing import Tuple, Optional, List, Callable, Union, TypeVar, Dict, NamedTuple
 
 import mwparserfromhell  # https://github.com/earwig/mwparserfromhell
+from mwparserfromhell.nodes import Template
 
 from wrpylib.mwmarkup import format_template_table
 
 
 from wrpylib.mwmarkup import format_template_table
 
@@ -763,7 +764,7 @@ class ValueErrorList(ValueError):
     pass
 
 
     pass
 
 
-def wikibox_from_template(template, converter_dict):
+def wikibox_from_template(template: Template, converter_dict: dict) -> dict:
     """Returns an ordered dict."""
     result = OrderedDict()
     exceptions_dict = OrderedDict()
     """Returns an ordered dict."""
     result = OrderedDict()
     exceptions_dict = OrderedDict()
@@ -785,14 +786,14 @@ def wikibox_from_template(template, converter_dict):
     return result
 
 
     return result
 
 
-def wikibox_to_template(value, name, converter_dict):
-    template = mwparserfromhell.nodes.template.Template(name)
+def wikibox_to_template(value: dict, name: str, converter_dict: dict) -> Template:
+    template = Template(name)
     for key, converter in converter_dict.items():
         template.add(key, converter.to_str(value[key]))
     return template
 
 
     for key, converter in converter_dict.items():
         template.add(key, converter.to_str(value[key]))
     return template
 
 
-def template_from_str(value, name):
+def template_from_str(value: str, name: str) -> Template:
     wikicode = mwparserfromhell.parse(value)
     template_list = wikicode.filter_templates(recursive=False, matches=lambda t: t.name.strip() == name)
     if len(template_list) == 0:
     wikicode = mwparserfromhell.parse(value)
     template_list = wikicode.filter_templates(recursive=False, matches=lambda t: t.name.strip() == name)
     if len(template_list) == 0:
@@ -802,12 +803,12 @@ def template_from_str(value, name):
     return template_list[0]
 
 
     return template_list[0]
 
 
-def wikibox_from_str(value, name, converter_dict):
+def wikibox_from_str(value: str, name: str, converter_dict: dict) -> dict:
     template = template_from_str(value, name)
     return wikibox_from_template(template, converter_dict)
 
 
     template = template_from_str(value, name)
     return wikibox_from_template(template, converter_dict)
 
 
-def wikibox_to_str(value, name, converter_dict):
+def wikibox_to_str(value: dict, name: str, converter_dict: dict) -> str:
     return str(wikibox_to_template(value, name, converter_dict))
 
 
     return str(wikibox_to_template(value, name, converter_dict))
 
 
@@ -826,7 +827,7 @@ RODELBAHNBOX_DICT = OrderedDict([
     ('Länge', opt_uint_converter),  # 3500
     ('Schwierigkeit', opt_difficulty_german_converter),  # 'mittel'
     ('Lawinen', opt_avalanches_german_converter),  # 'kaum'
     ('Länge', opt_uint_converter),  # 3500
     ('Schwierigkeit', opt_difficulty_german_converter),  # 'mittel'
     ('Lawinen', opt_avalanches_german_converter),  # 'kaum'
-    ('Betreiber', opt_str_converter),  # 'Max Mustermann'
+    ('Betreiber', opt_no_or_str_converter),  # 'Max Mustermann'
     ('Öffentliche Anreise', opt_public_transport_german_converter),  # 'Mittelmäßig'
     ('Aufstieg möglich', opt_bool_german_converter),  # 'Ja'
     ('Aufstieg getrennt', opt_tristate_german_comment_converter),  # 'Ja'
     ('Öffentliche Anreise', opt_public_transport_german_converter),  # 'Mittelmäßig'
     ('Aufstieg möglich', opt_bool_german_converter),  # 'Ja'
     ('Aufstieg getrennt', opt_tristate_german_comment_converter),  # 'Ja'
@@ -844,21 +845,21 @@ RODELBAHNBOX_DICT = OrderedDict([
 ])
 
 
 ])
 
 
-def rodelbahnbox_from_template(template):
+def rodelbahnbox_from_template(template: Template) -> dict:
     """Returns an ordered dict."""
     return wikibox_from_template(template, RODELBAHNBOX_DICT)
 
 
     """Returns an ordered dict."""
     return wikibox_from_template(template, RODELBAHNBOX_DICT)
 
 
-def rodelbahnbox_to_template(value):
+def rodelbahnbox_to_template(value: dict) -> Template:
     return wikibox_to_template(value, RODELBAHNBOX_TEMPLATE_NAME, RODELBAHNBOX_DICT)
 
 
     return wikibox_to_template(value, RODELBAHNBOX_TEMPLATE_NAME, RODELBAHNBOX_DICT)
 
 
-def rodelbahnbox_from_str(value):
+def rodelbahnbox_from_str(value: str) -> Dict:
     """Returns an ordered dict."""
     return wikibox_from_str(value, RODELBAHNBOX_TEMPLATE_NAME, RODELBAHNBOX_DICT)
 
 
     """Returns an ordered dict."""
     return wikibox_from_str(value, RODELBAHNBOX_TEMPLATE_NAME, RODELBAHNBOX_DICT)
 
 
-def rodelbahnbox_to_str(value):
+def rodelbahnbox_to_str(value: Dict) -> str:
     template = rodelbahnbox_to_template(value)
     format_template_table(template, 20)
     return str(template)
     template = rodelbahnbox_to_template(value)
     format_template_table(template, 20)
     return str(template)
@@ -886,21 +887,21 @@ GASTHAUSBOX_DICT = OrderedDict([
     ('Rodelbahnen', opt_wikipage_enum_converter)])
 
 
     ('Rodelbahnen', opt_wikipage_enum_converter)])
 
 
-def gasthausbox_from_template(template):
+def gasthausbox_from_template(template: Template) -> dict:
     """Returns an ordered dict."""
     return wikibox_from_template(template, GASTHAUSBOX_DICT)
 
 
     """Returns an ordered dict."""
     return wikibox_from_template(template, GASTHAUSBOX_DICT)
 
 
-def gasthausbox_to_template(value):
+def gasthausbox_to_template(value: dict) -> Template:
     return wikibox_to_template(value, GASTHAUSBOX_TEMPLATE_NAME, GASTHAUSBOX_DICT)
 
 
     return wikibox_to_template(value, GASTHAUSBOX_TEMPLATE_NAME, GASTHAUSBOX_DICT)
 
 
-def gasthausbox_from_str(value):
+def gasthausbox_from_str(value: str) -> dict:
     """Returns an ordered dict."""
     return wikibox_from_str(value, GASTHAUSBOX_TEMPLATE_NAME, GASTHAUSBOX_DICT)
 
 
     """Returns an ordered dict."""
     return wikibox_from_str(value, GASTHAUSBOX_TEMPLATE_NAME, GASTHAUSBOX_DICT)
 
 
-def gasthausbox_to_str(value):
+def gasthausbox_to_str(value: dict) -> str:
     template = gasthausbox_to_template(value)
     format_template_table(template, 17)
     return str(template)
     template = gasthausbox_to_template(value)
     format_template_table(template, 17)
     return str(template)
@@ -909,7 +910,7 @@ def gasthausbox_to_str(value):
 # Helper function to make page title pretty
 # -----------------------------------------
 
 # Helper function to make page title pretty
 # -----------------------------------------
 
-def sledrun_page_title_to_pretty_url(page_title):
+def sledrun_page_title_to_pretty_url(page_title: str) -> str:
     """Converts a page_title from the page_title column of wrsledruncache to name_url.
     name_url is not used by MediaWiki but by new applications like wrweb."""
     return page_title.lower().replace(' ', '-').replace('_', '-').replace('(', '').replace(')', '')
     """Converts a page_title from the page_title column of wrsledruncache to name_url.
     name_url is not used by MediaWiki but by new applications like wrweb."""
     return page_title.lower().replace(' ', '-').replace('_', '-').replace('(', '').replace(')', '')