|
1 #!/usr/bin/env python3 |
|
2 '''This script is designed to load quasi-static data into a PostGIS database |
|
3 for rendering maps. It differs from the usual scripts to do this in that it is |
|
4 designed to take its configuration from a file rather than be a series of shell |
|
5 commands. |
|
6 |
|
7 Some implicit assumptions are |
|
8 - Time spent querying (rendering) the data is more valuable than the one-time |
|
9 cost of loading it |
|
10 - The script will not be running multiple times in parallel. This is not |
|
11 normally likely because the script is likely to be called daily or less, |
|
12 not minutely. |
|
13 - Usage patterns will be similar to typical map rendering |
|
14 ''' |
|
15 |
|
16 import yaml |
|
17 import os |
|
18 import re |
|
19 import argparse |
|
20 import shutil |
|
21 |
|
22 # modules for getting data |
|
23 import zipfile |
|
24 import requests |
|
25 import io |
|
26 |
|
27 # modules for converting and postgres loading |
|
28 import subprocess |
|
29 import psycopg2 |
|
30 |
|
31 import logging |
|
32 |
|
33 |
|
34 def database_setup(conn, temp_schema, schema, metadata_table): |
|
35 with conn.cursor() as cur: |
|
36 cur.execute('''CREATE SCHEMA IF NOT EXISTS {temp_schema};''' |
|
37 .format(temp_schema=temp_schema)) |
|
38 cur.execute(('''CREATE TABLE IF NOT EXISTS "{schema}"."{metadata_table}"''' |
|
39 ''' (name text primary key, last_modified text);''') |
|
40 .format(schema=schema, metadata_table=metadata_table)) |
|
41 conn.commit() |
|
42 |
|
43 |
|
44 class Table: |
|
45 def __init__(self, name, conn, temp_schema, schema, metadata_table): |
|
46 self._name = name |
|
47 self._conn = conn |
|
48 self._temp_schema = temp_schema |
|
49 self._dst_schema = schema |
|
50 self._metadata_table = metadata_table |
|
51 |
|
52 # Clean up the temporary schema in preperation for loading |
|
53 def clean_temp(self): |
|
54 with self._conn.cursor() as cur: |
|
55 cur.execute('''DROP TABLE IF EXISTS "{temp_schema}"."{name}"''' |
|
56 .format(name=self._name, temp_schema=self._temp_schema)) |
|
57 self._conn.commit() |
|
58 |
|
59 # get the last modified date from the metadata table |
|
60 def last_modified(self): |
|
61 with self._conn.cursor() as cur: |
|
62 cur.execute('''SELECT last_modified FROM "{schema}"."{metadata_table}" WHERE name = %s''' |
|
63 .format(schema=self._dst_schema, metadata_table=self._metadata_table), [self._name]) |
|
64 results = cur.fetchone() |
|
65 if results is not None: |
|
66 return results[0] |
|
67 |
|
68 def index(self): |
|
69 with self._conn.cursor() as cur: |
|
70 # Disable autovacuum while manipulating the table, since it'll get clustered towards the end. |
|
71 cur.execute('''ALTER TABLE "{temp_schema}"."{name}" SET ( autovacuum_enabled = FALSE );''' |
|
72 .format(name=self._name, temp_schema=self._temp_schema)) |
|
73 # ogr creates a ogc_fid column we don't need |
|
74 cur.execute('''ALTER TABLE "{temp_schema}"."{name}" DROP COLUMN ogc_fid;''' |
|
75 .format(name=self._name, temp_schema=self._temp_schema)) |
|
76 |
|
77 # Null geometries are useless for rendering |
|
78 cur.execute('''DELETE FROM "{temp_schema}"."{name}" WHERE way IS NULL;''' |
|
79 .format(name=self._name, temp_schema=self._temp_schema)) |
|
80 cur.execute('''ALTER TABLE "{temp_schema}"."{name}" ALTER COLUMN way SET NOT NULL;''' |
|
81 .format(name=self._name, temp_schema=self._temp_schema)) |
|
82 # sorting static tables helps performance and reduces size from the column drop above |
|
83 cur.execute(('''CREATE INDEX "{name}_order" ON "{temp_schema}"."{name}" ''' |
|
84 '''(ST_Envelope(way));''' |
|
85 '''CLUSTER "{temp_schema}"."{name}" ''' |
|
86 '''USING "{name}_order";''' |
|
87 '''DROP INDEX "{temp_schema}"."{name}_order";''' |
|
88 '''CREATE INDEX ON "{temp_schema}"."{name}" ''' |
|
89 '''USING GIST (way) WITH (fillfactor=100);''') |
|
90 .format(name=self._name, temp_schema=self._temp_schema)) |
|
91 # Reset autovacuum. The table is static, so this doesn't really |
|
92 # matter since it'll never need a vacuum. |
|
93 cur.execute('''ALTER TABLE "{temp_schema}"."{name}" RESET ( autovacuum_enabled );''' |
|
94 .format(name=self._name, temp_schema=self._temp_schema)) |
|
95 self._conn.commit() |
|
96 |
|
97 # VACUUM can't be run in transaction, so autocommit needs to be turned on |
|
98 old_autocommit = self._conn.autocommit |
|
99 try: |
|
100 self._conn.autocommit = True |
|
101 with self._conn.cursor() as cur: |
|
102 cur.execute('''VACUUM ANALYZE "{temp_schema}"."{name}";''' |
|
103 .format(name=self._name, temp_schema=self._temp_schema)) |
|
104 finally: |
|
105 self._conn.autocommit = old_autocommit |
|
106 |
|
107 def replace(self, new_last_modified): |
|
108 with self._conn.cursor() as cur: |
|
109 cur.execute('''BEGIN;''') |
|
110 cur.execute(('''DROP TABLE IF EXISTS "{schema}"."{name}";''' |
|
111 '''ALTER TABLE "{temp_schema}"."{name}" SET SCHEMA "{schema}";''') |
|
112 .format(name=self._name, temp_schema=self._temp_schema, schema=self._dst_schema)) |
|
113 |
|
114 # We checked if the metadata table had this table way up above |
|
115 cur.execute('''SELECT 1 FROM "{schema}"."{metadata_table}" WHERE name = %s''' |
|
116 .format(schema=self._dst_schema, metadata_table=self._metadata_table), |
|
117 [self._name]) |
|
118 if cur.rowcount == 0: |
|
119 cur.execute(('''INSERT INTO "{schema}"."{metadata_table}" ''' |
|
120 '''(name, last_modified) VALUES (%s, %s)''') |
|
121 .format(schema=self._dst_schema, metadata_table=self._metadata_table), |
|
122 [self._name, new_last_modified]) |
|
123 else: |
|
124 cur.execute('''UPDATE "{schema}"."{metadata_table}" SET last_modified = %s WHERE name = %s''' |
|
125 .format(schema=self._dst_schema, metadata_table=self._metadata_table), |
|
126 [new_last_modified, self._name]) |
|
127 self._conn.commit() |
|
128 |
|
129 |
|
130 def main(): |
|
131 # parse options |
|
132 parser = argparse.ArgumentParser(description="Load external data into a database") |
|
133 |
|
134 parser.add_argument("-f", "--force", action="store_true", help="Download new data, even if not required") |
|
135 |
|
136 parser.add_argument("-c", "--config", action="store", default="external-data.yml", |
|
137 help="Name of configuration file (default external-data.yml)") |
|
138 parser.add_argument("-D", "--data", action="store", help="Override data download directory") |
|
139 |
|
140 parser.add_argument("-d", "--database", action="store", help="Override database name to connect to") |
|
141 parser.add_argument("-H", "--host", action="store", |
|
142 help="Override database server host or socket directory") |
|
143 parser.add_argument("-p", "--port", action="store", help="Override database server port") |
|
144 parser.add_argument("-U", "--username", action="store", help="Override database user name") |
|
145 parser.add_argument("-v", "--verbose", action="store_true", help="Be more verbose. Overrides -q") |
|
146 parser.add_argument("-q", "--quiet", action="store_true", help="Only report serious problems") |
|
147 |
|
148 opts = parser.parse_args() |
|
149 |
|
150 if opts.verbose: |
|
151 logging.basicConfig(level=logging.DEBUG) |
|
152 elif opts.quiet: |
|
153 logging.basicConfig(level=logging.WARNING) |
|
154 else: |
|
155 logging.basicConfig(level=logging.INFO) |
|
156 |
|
157 with open(opts.config) as config_file: |
|
158 config = yaml.safe_load(config_file) |
|
159 data_dir = opts.data or config["settings"]["data_dir"] |
|
160 os.makedirs(data_dir, exist_ok=True) |
|
161 |
|
162 # If the DB options are unspecified in both on the command line and in the |
|
163 # config file, libpq will pick what to use with the None |
|
164 database = opts.database or config["settings"].get("database") |
|
165 host = opts.host or config["settings"].get("host") |
|
166 port = opts.port or config["settings"].get("port") |
|
167 user = opts.username or config["settings"].get("username") |
|
168 with requests.Session() as s, \ |
|
169 psycopg2.connect(database=database, |
|
170 host=host, port=port, |
|
171 user=user) as conn: |
|
172 |
|
173 s.headers.update({'User-Agent': 'get-external-data.py/osm-carto'}) |
|
174 |
|
175 # DB setup |
|
176 database_setup(conn, config["settings"]["temp_schema"], |
|
177 config["settings"]["schema"], |
|
178 config["settings"]["metadata_table"]) |
|
179 |
|
180 for name, source in config["sources"].items(): |
|
181 logging.info("Checking table {}".format(name)) |
|
182 # Don't attempt to handle strange names |
|
183 # Even if there was code to escape them properly here, you don't want |
|
184 # in a style with all the quoting headaches |
|
185 if not re.match('''^[a-zA-Z0-9_]+$''', name): |
|
186 raise RuntimeError("Only ASCII alphanumeric table are names supported") |
|
187 |
|
188 workingdir = os.path.join(data_dir, name) |
|
189 # Clean up anything left over from an aborted run |
|
190 shutil.rmtree(workingdir, ignore_errors=True) |
|
191 |
|
192 os.makedirs(workingdir, exist_ok=True) |
|
193 |
|
194 this_table = Table(name, conn, |
|
195 config["settings"]["temp_schema"], |
|
196 config["settings"]["schema"], |
|
197 config["settings"]["metadata_table"]) |
|
198 this_table.clean_temp() |
|
199 |
|
200 if not opts.force: |
|
201 headers = {'If-Modified-Since': this_table.last_modified()} |
|
202 else: |
|
203 headers = {} |
|
204 |
|
205 download = s.get(source["url"], headers=headers) |
|
206 download.raise_for_status() |
|
207 |
|
208 if (download.status_code == 200): |
|
209 if "Last-Modified" in download.headers: |
|
210 new_last_modified = download.headers["Last-Modified"] |
|
211 else: |
|
212 new_last_modified = None |
|
213 if "archive" in source and source["archive"]["format"] == "zip": |
|
214 zip = zipfile.ZipFile(io.BytesIO(download.content)) |
|
215 for member in source["archive"]["files"]: |
|
216 zip.extract(member, workingdir) |
|
217 |
|
218 ogrpg = "PG:dbname={}".format(database) |
|
219 |
|
220 if port is not None: |
|
221 ogrpg = ogrpg + " port={}".format(port) |
|
222 if user is not None: |
|
223 ogrpg = ogrpg + " user={}".format(user) |
|
224 if host is not None: |
|
225 ogrpg = ogrpg + " host={}".format(host) |
|
226 |
|
227 ogrcommand = ["ogr2ogr", |
|
228 '-f', 'PostgreSQL', |
|
229 '-lco', 'GEOMETRY_NAME=way', |
|
230 '-lco', 'SPATIAL_INDEX=FALSE', |
|
231 '-lco', 'EXTRACT_SCHEMA_FROM_LAYER_NAME=YES', |
|
232 '-nln', "{}.{}".format(config["settings"]["temp_schema"], name)] |
|
233 |
|
234 if "ogropts" in source: |
|
235 ogrcommand += source["ogropts"] |
|
236 |
|
237 ogrcommand += [ogrpg, os.path.join(workingdir, source["file"])] |
|
238 |
|
239 logging.debug("running {}".format(subprocess.list2cmdline(ogrcommand))) |
|
240 |
|
241 # ogr2ogr can raise errors here, so they need to be caught |
|
242 try: |
|
243 subprocess.check_output(ogrcommand, stderr=subprocess.PIPE, universal_newlines=True) |
|
244 except subprocess.CalledProcessError as e: |
|
245 # Add more detail on stdout for the logs |
|
246 logging.critical("ogr2ogr returned {} with layer {}".format(e.returncode, name)) |
|
247 logging.critical("Command line was {}".format(subprocess.list2cmdline(e.cmd))) |
|
248 logging.critical("Output was\n{}".format(e.output)) |
|
249 raise RuntimeError("ogr2ogr error when loading table {}".format(name)) |
|
250 |
|
251 this_table.index() |
|
252 this_table.replace(new_last_modified) |
|
253 else: |
|
254 logging.info("Table {} did not require updating".format(name)) |
|
255 |
|
256 |
|
257 if __name__ == '__main__': |
|
258 main() |