scripts/get-external-data.py

changeset 0
b0eb3af2f9ee
equal deleted inserted replaced
-1:000000000000 0:b0eb3af2f9ee
1 #!/usr/bin/env python3
2 '''This script is designed to load quasi-static data into a PostGIS database
3 for rendering maps. It differs from the usual scripts to do this in that it is
4 designed to take its configuration from a file rather than be a series of shell
5 commands.
6
7 Some implicit assumptions are
8 - Time spent querying (rendering) the data is more valuable than the one-time
9 cost of loading it
10 - The script will not be running multiple times in parallel. This is not
11 normally likely because the script is likely to be called daily or less,
12 not minutely.
13 - Usage patterns will be similar to typical map rendering
14 '''
15
16 import yaml
17 import os
18 import re
19 import argparse
20 import shutil
21
22 # modules for getting data
23 import zipfile
24 import requests
25 import io
26
27 # modules for converting and postgres loading
28 import subprocess
29 import psycopg2
30
31 import logging
32
33
34 def database_setup(conn, temp_schema, schema, metadata_table):
35 with conn.cursor() as cur:
36 cur.execute('''CREATE SCHEMA IF NOT EXISTS {temp_schema};'''
37 .format(temp_schema=temp_schema))
38 cur.execute(('''CREATE TABLE IF NOT EXISTS "{schema}"."{metadata_table}"'''
39 ''' (name text primary key, last_modified text);''')
40 .format(schema=schema, metadata_table=metadata_table))
41 conn.commit()
42
43
44 class Table:
45 def __init__(self, name, conn, temp_schema, schema, metadata_table):
46 self._name = name
47 self._conn = conn
48 self._temp_schema = temp_schema
49 self._dst_schema = schema
50 self._metadata_table = metadata_table
51
52 # Clean up the temporary schema in preperation for loading
53 def clean_temp(self):
54 with self._conn.cursor() as cur:
55 cur.execute('''DROP TABLE IF EXISTS "{temp_schema}"."{name}"'''
56 .format(name=self._name, temp_schema=self._temp_schema))
57 self._conn.commit()
58
59 # get the last modified date from the metadata table
60 def last_modified(self):
61 with self._conn.cursor() as cur:
62 cur.execute('''SELECT last_modified FROM "{schema}"."{metadata_table}" WHERE name = %s'''
63 .format(schema=self._dst_schema, metadata_table=self._metadata_table), [self._name])
64 results = cur.fetchone()
65 if results is not None:
66 return results[0]
67
68 def index(self):
69 with self._conn.cursor() as cur:
70 # Disable autovacuum while manipulating the table, since it'll get clustered towards the end.
71 cur.execute('''ALTER TABLE "{temp_schema}"."{name}" SET ( autovacuum_enabled = FALSE );'''
72 .format(name=self._name, temp_schema=self._temp_schema))
73 # ogr creates a ogc_fid column we don't need
74 cur.execute('''ALTER TABLE "{temp_schema}"."{name}" DROP COLUMN ogc_fid;'''
75 .format(name=self._name, temp_schema=self._temp_schema))
76
77 # Null geometries are useless for rendering
78 cur.execute('''DELETE FROM "{temp_schema}"."{name}" WHERE way IS NULL;'''
79 .format(name=self._name, temp_schema=self._temp_schema))
80 cur.execute('''ALTER TABLE "{temp_schema}"."{name}" ALTER COLUMN way SET NOT NULL;'''
81 .format(name=self._name, temp_schema=self._temp_schema))
82 # sorting static tables helps performance and reduces size from the column drop above
83 cur.execute(('''CREATE INDEX "{name}_order" ON "{temp_schema}"."{name}" '''
84 '''(ST_Envelope(way));'''
85 '''CLUSTER "{temp_schema}"."{name}" '''
86 '''USING "{name}_order";'''
87 '''DROP INDEX "{temp_schema}"."{name}_order";'''
88 '''CREATE INDEX ON "{temp_schema}"."{name}" '''
89 '''USING GIST (way) WITH (fillfactor=100);''')
90 .format(name=self._name, temp_schema=self._temp_schema))
91 # Reset autovacuum. The table is static, so this doesn't really
92 # matter since it'll never need a vacuum.
93 cur.execute('''ALTER TABLE "{temp_schema}"."{name}" RESET ( autovacuum_enabled );'''
94 .format(name=self._name, temp_schema=self._temp_schema))
95 self._conn.commit()
96
97 # VACUUM can't be run in transaction, so autocommit needs to be turned on
98 old_autocommit = self._conn.autocommit
99 try:
100 self._conn.autocommit = True
101 with self._conn.cursor() as cur:
102 cur.execute('''VACUUM ANALYZE "{temp_schema}"."{name}";'''
103 .format(name=self._name, temp_schema=self._temp_schema))
104 finally:
105 self._conn.autocommit = old_autocommit
106
107 def replace(self, new_last_modified):
108 with self._conn.cursor() as cur:
109 cur.execute('''BEGIN;''')
110 cur.execute(('''DROP TABLE IF EXISTS "{schema}"."{name}";'''
111 '''ALTER TABLE "{temp_schema}"."{name}" SET SCHEMA "{schema}";''')
112 .format(name=self._name, temp_schema=self._temp_schema, schema=self._dst_schema))
113
114 # We checked if the metadata table had this table way up above
115 cur.execute('''SELECT 1 FROM "{schema}"."{metadata_table}" WHERE name = %s'''
116 .format(schema=self._dst_schema, metadata_table=self._metadata_table),
117 [self._name])
118 if cur.rowcount == 0:
119 cur.execute(('''INSERT INTO "{schema}"."{metadata_table}" '''
120 '''(name, last_modified) VALUES (%s, %s)''')
121 .format(schema=self._dst_schema, metadata_table=self._metadata_table),
122 [self._name, new_last_modified])
123 else:
124 cur.execute('''UPDATE "{schema}"."{metadata_table}" SET last_modified = %s WHERE name = %s'''
125 .format(schema=self._dst_schema, metadata_table=self._metadata_table),
126 [new_last_modified, self._name])
127 self._conn.commit()
128
129
130 def main():
131 # parse options
132 parser = argparse.ArgumentParser(description="Load external data into a database")
133
134 parser.add_argument("-f", "--force", action="store_true", help="Download new data, even if not required")
135
136 parser.add_argument("-c", "--config", action="store", default="external-data.yml",
137 help="Name of configuration file (default external-data.yml)")
138 parser.add_argument("-D", "--data", action="store", help="Override data download directory")
139
140 parser.add_argument("-d", "--database", action="store", help="Override database name to connect to")
141 parser.add_argument("-H", "--host", action="store",
142 help="Override database server host or socket directory")
143 parser.add_argument("-p", "--port", action="store", help="Override database server port")
144 parser.add_argument("-U", "--username", action="store", help="Override database user name")
145 parser.add_argument("-v", "--verbose", action="store_true", help="Be more verbose. Overrides -q")
146 parser.add_argument("-q", "--quiet", action="store_true", help="Only report serious problems")
147
148 opts = parser.parse_args()
149
150 if opts.verbose:
151 logging.basicConfig(level=logging.DEBUG)
152 elif opts.quiet:
153 logging.basicConfig(level=logging.WARNING)
154 else:
155 logging.basicConfig(level=logging.INFO)
156
157 with open(opts.config) as config_file:
158 config = yaml.safe_load(config_file)
159 data_dir = opts.data or config["settings"]["data_dir"]
160 os.makedirs(data_dir, exist_ok=True)
161
162 # If the DB options are unspecified in both on the command line and in the
163 # config file, libpq will pick what to use with the None
164 database = opts.database or config["settings"].get("database")
165 host = opts.host or config["settings"].get("host")
166 port = opts.port or config["settings"].get("port")
167 user = opts.username or config["settings"].get("username")
168 with requests.Session() as s, \
169 psycopg2.connect(database=database,
170 host=host, port=port,
171 user=user) as conn:
172
173 s.headers.update({'User-Agent': 'get-external-data.py/osm-carto'})
174
175 # DB setup
176 database_setup(conn, config["settings"]["temp_schema"],
177 config["settings"]["schema"],
178 config["settings"]["metadata_table"])
179
180 for name, source in config["sources"].items():
181 logging.info("Checking table {}".format(name))
182 # Don't attempt to handle strange names
183 # Even if there was code to escape them properly here, you don't want
184 # in a style with all the quoting headaches
185 if not re.match('''^[a-zA-Z0-9_]+$''', name):
186 raise RuntimeError("Only ASCII alphanumeric table are names supported")
187
188 workingdir = os.path.join(data_dir, name)
189 # Clean up anything left over from an aborted run
190 shutil.rmtree(workingdir, ignore_errors=True)
191
192 os.makedirs(workingdir, exist_ok=True)
193
194 this_table = Table(name, conn,
195 config["settings"]["temp_schema"],
196 config["settings"]["schema"],
197 config["settings"]["metadata_table"])
198 this_table.clean_temp()
199
200 if not opts.force:
201 headers = {'If-Modified-Since': this_table.last_modified()}
202 else:
203 headers = {}
204
205 download = s.get(source["url"], headers=headers)
206 download.raise_for_status()
207
208 if (download.status_code == 200):
209 if "Last-Modified" in download.headers:
210 new_last_modified = download.headers["Last-Modified"]
211 else:
212 new_last_modified = None
213 if "archive" in source and source["archive"]["format"] == "zip":
214 zip = zipfile.ZipFile(io.BytesIO(download.content))
215 for member in source["archive"]["files"]:
216 zip.extract(member, workingdir)
217
218 ogrpg = "PG:dbname={}".format(database)
219
220 if port is not None:
221 ogrpg = ogrpg + " port={}".format(port)
222 if user is not None:
223 ogrpg = ogrpg + " user={}".format(user)
224 if host is not None:
225 ogrpg = ogrpg + " host={}".format(host)
226
227 ogrcommand = ["ogr2ogr",
228 '-f', 'PostgreSQL',
229 '-lco', 'GEOMETRY_NAME=way',
230 '-lco', 'SPATIAL_INDEX=FALSE',
231 '-lco', 'EXTRACT_SCHEMA_FROM_LAYER_NAME=YES',
232 '-nln', "{}.{}".format(config["settings"]["temp_schema"], name)]
233
234 if "ogropts" in source:
235 ogrcommand += source["ogropts"]
236
237 ogrcommand += [ogrpg, os.path.join(workingdir, source["file"])]
238
239 logging.debug("running {}".format(subprocess.list2cmdline(ogrcommand)))
240
241 # ogr2ogr can raise errors here, so they need to be caught
242 try:
243 subprocess.check_output(ogrcommand, stderr=subprocess.PIPE, universal_newlines=True)
244 except subprocess.CalledProcessError as e:
245 # Add more detail on stdout for the logs
246 logging.critical("ogr2ogr returned {} with layer {}".format(e.returncode, name))
247 logging.critical("Command line was {}".format(subprocess.list2cmdline(e.cmd)))
248 logging.critical("Output was\n{}".format(e.output))
249 raise RuntimeError("ogr2ogr error when loading table {}".format(name))
250
251 this_table.index()
252 this_table.replace(new_last_modified)
253 else:
254 logging.info("Table {} did not require updating".format(name))
255
256
257 if __name__ == '__main__':
258 main()

mercurial