scripts/get-external-data.py

changeset 0
b0eb3af2f9ee
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/scripts/get-external-data.py	Mon Sep 14 22:55:45 2020 +0300
@@ -0,0 +1,258 @@
+#!/usr/bin/env python3
+'''This script is designed to load quasi-static data into a PostGIS database
+for rendering maps. It differs from the usual scripts to do this in that it is
+designed to take its configuration from a file rather than be a series of shell
+commands.
+
+Some implicit assumptions are
+- Time spent querying (rendering) the data is more valuable than the one-time
+    cost of loading it
+- The script will not be running multiple times in parallel. This is not
+    normally likely because the script is likely to be called daily or less,
+    not minutely.
+- Usage patterns will be similar to typical map rendering
+'''
+
+import yaml
+import os
+import re
+import argparse
+import shutil
+
+# modules for getting data
+import zipfile
+import requests
+import io
+
+# modules for converting and postgres loading
+import subprocess
+import psycopg2
+
+import logging
+
+
+def database_setup(conn, temp_schema, schema, metadata_table):
+    with conn.cursor() as cur:
+            cur.execute('''CREATE SCHEMA IF NOT EXISTS {temp_schema};'''
+                        .format(temp_schema=temp_schema))
+            cur.execute(('''CREATE TABLE IF NOT EXISTS "{schema}"."{metadata_table}"'''
+                         ''' (name text primary key, last_modified text);''')
+                        .format(schema=schema, metadata_table=metadata_table))
+    conn.commit()
+
+
+class Table:
+    def __init__(self, name, conn, temp_schema, schema, metadata_table):
+        self._name = name
+        self._conn = conn
+        self._temp_schema = temp_schema
+        self._dst_schema = schema
+        self._metadata_table = metadata_table
+
+    # Clean up the temporary schema in preperation for loading
+    def clean_temp(self):
+        with self._conn.cursor() as cur:
+            cur.execute('''DROP TABLE IF EXISTS "{temp_schema}"."{name}"'''
+                        .format(name=self._name, temp_schema=self._temp_schema))
+        self._conn.commit()
+
+    # get the last modified date from the metadata table
+    def last_modified(self):
+        with self._conn.cursor() as cur:
+            cur.execute('''SELECT last_modified FROM "{schema}"."{metadata_table}" WHERE name = %s'''
+                        .format(schema=self._dst_schema, metadata_table=self._metadata_table), [self._name])
+            results = cur.fetchone()
+            if results is not None:
+                return results[0]
+
+    def index(self):
+        with self._conn.cursor() as cur:
+            # Disable autovacuum while manipulating the table, since it'll get clustered towards the end.
+            cur.execute('''ALTER TABLE "{temp_schema}"."{name}" SET ( autovacuum_enabled = FALSE );'''
+                        .format(name=self._name, temp_schema=self._temp_schema))
+            # ogr creates a ogc_fid column we don't need
+            cur.execute('''ALTER TABLE "{temp_schema}"."{name}" DROP COLUMN ogc_fid;'''
+                        .format(name=self._name, temp_schema=self._temp_schema))
+
+            # Null geometries are useless for rendering
+            cur.execute('''DELETE FROM "{temp_schema}"."{name}" WHERE way IS NULL;'''
+                        .format(name=self._name, temp_schema=self._temp_schema))
+            cur.execute('''ALTER TABLE "{temp_schema}"."{name}" ALTER COLUMN way SET NOT NULL;'''
+                        .format(name=self._name, temp_schema=self._temp_schema))
+            # sorting static tables helps performance and reduces size from the column drop above
+            cur.execute(('''CREATE INDEX "{name}_order" ON "{temp_schema}"."{name}" '''
+                         '''(ST_Envelope(way));'''
+                         '''CLUSTER "{temp_schema}"."{name}" '''
+                         '''USING "{name}_order";'''
+                         '''DROP INDEX "{temp_schema}"."{name}_order";'''
+                         '''CREATE INDEX ON "{temp_schema}"."{name}" '''
+                         '''USING GIST (way) WITH (fillfactor=100);''')
+                        .format(name=self._name, temp_schema=self._temp_schema))
+            # Reset autovacuum. The table is static, so this doesn't really
+            # matter since it'll never need a vacuum.
+            cur.execute('''ALTER TABLE "{temp_schema}"."{name}" RESET ( autovacuum_enabled );'''
+                        .format(name=self._name, temp_schema=self._temp_schema))
+        self._conn.commit()
+
+        # VACUUM can't be run in transaction, so autocommit needs to be turned on
+        old_autocommit = self._conn.autocommit
+        try:
+            self._conn.autocommit = True
+            with self._conn.cursor() as cur:
+                cur.execute('''VACUUM ANALYZE "{temp_schema}"."{name}";'''
+                            .format(name=self._name, temp_schema=self._temp_schema))
+        finally:
+            self._conn.autocommit = old_autocommit
+
+    def replace(self, new_last_modified):
+        with self._conn.cursor() as cur:
+            cur.execute('''BEGIN;''')
+            cur.execute(('''DROP TABLE IF EXISTS "{schema}"."{name}";'''
+                         '''ALTER TABLE "{temp_schema}"."{name}" SET SCHEMA "{schema}";''')
+                        .format(name=self._name, temp_schema=self._temp_schema, schema=self._dst_schema))
+
+            # We checked if the metadata table had this table way up above
+            cur.execute('''SELECT 1 FROM "{schema}"."{metadata_table}" WHERE name = %s'''
+                        .format(schema=self._dst_schema, metadata_table=self._metadata_table),
+                        [self._name])
+            if cur.rowcount == 0:
+                cur.execute(('''INSERT INTO "{schema}"."{metadata_table}" '''
+                            '''(name, last_modified) VALUES (%s, %s)''')
+                            .format(schema=self._dst_schema, metadata_table=self._metadata_table),
+                            [self._name, new_last_modified])
+            else:
+                cur.execute('''UPDATE "{schema}"."{metadata_table}" SET last_modified = %s WHERE name = %s'''
+                            .format(schema=self._dst_schema, metadata_table=self._metadata_table),
+                            [new_last_modified, self._name])
+        self._conn.commit()
+
+
+def main():
+    # parse options
+    parser = argparse.ArgumentParser(description="Load external data into a database")
+
+    parser.add_argument("-f", "--force", action="store_true", help="Download new data, even if not required")
+
+    parser.add_argument("-c", "--config", action="store", default="external-data.yml",
+                        help="Name of configuration file (default external-data.yml)")
+    parser.add_argument("-D", "--data", action="store", help="Override data download directory")
+
+    parser.add_argument("-d", "--database", action="store", help="Override database name to connect to")
+    parser.add_argument("-H", "--host", action="store",
+                        help="Override database server host or socket directory")
+    parser.add_argument("-p", "--port", action="store", help="Override database server port")
+    parser.add_argument("-U", "--username", action="store", help="Override database user name")
+    parser.add_argument("-v", "--verbose", action="store_true", help="Be more verbose. Overrides -q")
+    parser.add_argument("-q", "--quiet", action="store_true", help="Only report serious problems")
+
+    opts = parser.parse_args()
+
+    if opts.verbose:
+        logging.basicConfig(level=logging.DEBUG)
+    elif opts.quiet:
+        logging.basicConfig(level=logging.WARNING)
+    else:
+        logging.basicConfig(level=logging.INFO)
+
+    with open(opts.config) as config_file:
+        config = yaml.safe_load(config_file)
+        data_dir = opts.data or config["settings"]["data_dir"]
+        os.makedirs(data_dir, exist_ok=True)
+
+        # If the DB options are unspecified in both on the command line and in the
+        # config file, libpq will pick what to use with the None
+        database = opts.database or config["settings"].get("database")
+        host = opts.host or config["settings"].get("host")
+        port = opts.port or config["settings"].get("port")
+        user = opts.username or config["settings"].get("username")
+        with requests.Session() as s, \
+            psycopg2.connect(database=database,
+                             host=host, port=port,
+                             user=user) as conn:
+
+            s.headers.update({'User-Agent': 'get-external-data.py/osm-carto'})
+
+            # DB setup
+            database_setup(conn, config["settings"]["temp_schema"],
+                           config["settings"]["schema"],
+                           config["settings"]["metadata_table"])
+
+            for name, source in config["sources"].items():
+                logging.info("Checking table {}".format(name))
+                # Don't attempt to handle strange names
+                # Even if there was code to escape them properly here, you don't want
+                # in a style with all the quoting headaches
+                if not re.match('''^[a-zA-Z0-9_]+$''', name):
+                    raise RuntimeError("Only ASCII alphanumeric table are names supported")
+
+                workingdir = os.path.join(data_dir, name)
+                # Clean up anything left over from an aborted run
+                shutil.rmtree(workingdir, ignore_errors=True)
+
+                os.makedirs(workingdir, exist_ok=True)
+
+                this_table = Table(name, conn,
+                                   config["settings"]["temp_schema"],
+                                   config["settings"]["schema"],
+                                   config["settings"]["metadata_table"])
+                this_table.clean_temp()
+
+                if not opts.force:
+                    headers = {'If-Modified-Since': this_table.last_modified()}
+                else:
+                    headers = {}
+
+                download = s.get(source["url"], headers=headers)
+                download.raise_for_status()
+
+                if (download.status_code == 200):
+                    if "Last-Modified" in download.headers:
+                        new_last_modified = download.headers["Last-Modified"]
+                    else:
+                        new_last_modified = None
+                    if "archive" in source and source["archive"]["format"] == "zip":
+                        zip = zipfile.ZipFile(io.BytesIO(download.content))
+                        for member in source["archive"]["files"]:
+                            zip.extract(member, workingdir)
+
+                    ogrpg = "PG:dbname={}".format(database)
+
+                    if port is not None:
+                        ogrpg = ogrpg + " port={}".format(port)
+                    if user is not None:
+                        ogrpg = ogrpg + " user={}".format(user)
+                    if host is not None:
+                        ogrpg = ogrpg + " host={}".format(host)
+
+                    ogrcommand = ["ogr2ogr",
+                                  '-f', 'PostgreSQL',
+                                  '-lco', 'GEOMETRY_NAME=way',
+                                  '-lco', 'SPATIAL_INDEX=FALSE',
+                                  '-lco', 'EXTRACT_SCHEMA_FROM_LAYER_NAME=YES',
+                                  '-nln', "{}.{}".format(config["settings"]["temp_schema"], name)]
+
+                    if "ogropts" in source:
+                        ogrcommand += source["ogropts"]
+
+                    ogrcommand += [ogrpg, os.path.join(workingdir, source["file"])]
+
+                    logging.debug("running {}".format(subprocess.list2cmdline(ogrcommand)))
+
+                    # ogr2ogr can raise errors here, so they need to be caught
+                    try:
+                        subprocess.check_output(ogrcommand, stderr=subprocess.PIPE, universal_newlines=True)
+                    except subprocess.CalledProcessError as e:
+                        # Add more detail on stdout for the logs
+                        logging.critical("ogr2ogr returned {} with layer {}".format(e.returncode, name))
+                        logging.critical("Command line was {}".format(subprocess.list2cmdline(e.cmd)))
+                        logging.critical("Output was\n{}".format(e.output))
+                        raise RuntimeError("ogr2ogr error when loading table {}".format(name))
+
+                    this_table.index()
+                    this_table.replace(new_last_modified)
+                else:
+                    logging.info("Table {} did not require updating".format(name))
+
+
+if __name__ == '__main__':
+    main()

mercurial