Mon, 14 Sep 2020 22:55:45 +0300
restore .hg...
0 | 1 | #!/usr/bin/env python3 |
2 | '''This script is designed to load quasi-static data into a PostGIS database | |
3 | for rendering maps. It differs from the usual scripts to do this in that it is | |
4 | designed to take its configuration from a file rather than be a series of shell | |
5 | commands. | |
6 | ||
7 | Some implicit assumptions are | |
8 | - Time spent querying (rendering) the data is more valuable than the one-time | |
9 | cost of loading it | |
10 | - The script will not be running multiple times in parallel. This is not | |
11 | normally likely because the script is likely to be called daily or less, | |
12 | not minutely. | |
13 | - Usage patterns will be similar to typical map rendering | |
14 | ''' | |
15 | ||
16 | import yaml | |
17 | import os | |
18 | import re | |
19 | import argparse | |
20 | import shutil | |
21 | ||
22 | # modules for getting data | |
23 | import zipfile | |
24 | import requests | |
25 | import io | |
26 | ||
27 | # modules for converting and postgres loading | |
28 | import subprocess | |
29 | import psycopg2 | |
30 | ||
31 | import logging | |
32 | ||
33 | ||
34 | def database_setup(conn, temp_schema, schema, metadata_table): | |
35 | with conn.cursor() as cur: | |
36 | cur.execute('''CREATE SCHEMA IF NOT EXISTS {temp_schema};''' | |
37 | .format(temp_schema=temp_schema)) | |
38 | cur.execute(('''CREATE TABLE IF NOT EXISTS "{schema}"."{metadata_table}"''' | |
39 | ''' (name text primary key, last_modified text);''') | |
40 | .format(schema=schema, metadata_table=metadata_table)) | |
41 | conn.commit() | |
42 | ||
43 | ||
44 | class Table: | |
45 | def __init__(self, name, conn, temp_schema, schema, metadata_table): | |
46 | self._name = name | |
47 | self._conn = conn | |
48 | self._temp_schema = temp_schema | |
49 | self._dst_schema = schema | |
50 | self._metadata_table = metadata_table | |
51 | ||
52 | # Clean up the temporary schema in preperation for loading | |
53 | def clean_temp(self): | |
54 | with self._conn.cursor() as cur: | |
55 | cur.execute('''DROP TABLE IF EXISTS "{temp_schema}"."{name}"''' | |
56 | .format(name=self._name, temp_schema=self._temp_schema)) | |
57 | self._conn.commit() | |
58 | ||
59 | # get the last modified date from the metadata table | |
60 | def last_modified(self): | |
61 | with self._conn.cursor() as cur: | |
62 | cur.execute('''SELECT last_modified FROM "{schema}"."{metadata_table}" WHERE name = %s''' | |
63 | .format(schema=self._dst_schema, metadata_table=self._metadata_table), [self._name]) | |
64 | results = cur.fetchone() | |
65 | if results is not None: | |
66 | return results[0] | |
67 | ||
68 | def index(self): | |
69 | with self._conn.cursor() as cur: | |
70 | # Disable autovacuum while manipulating the table, since it'll get clustered towards the end. | |
71 | cur.execute('''ALTER TABLE "{temp_schema}"."{name}" SET ( autovacuum_enabled = FALSE );''' | |
72 | .format(name=self._name, temp_schema=self._temp_schema)) | |
73 | # ogr creates a ogc_fid column we don't need | |
74 | cur.execute('''ALTER TABLE "{temp_schema}"."{name}" DROP COLUMN ogc_fid;''' | |
75 | .format(name=self._name, temp_schema=self._temp_schema)) | |
76 | ||
77 | # Null geometries are useless for rendering | |
78 | cur.execute('''DELETE FROM "{temp_schema}"."{name}" WHERE way IS NULL;''' | |
79 | .format(name=self._name, temp_schema=self._temp_schema)) | |
80 | cur.execute('''ALTER TABLE "{temp_schema}"."{name}" ALTER COLUMN way SET NOT NULL;''' | |
81 | .format(name=self._name, temp_schema=self._temp_schema)) | |
82 | # sorting static tables helps performance and reduces size from the column drop above | |
83 | cur.execute(('''CREATE INDEX "{name}_order" ON "{temp_schema}"."{name}" ''' | |
84 | '''(ST_Envelope(way));''' | |
85 | '''CLUSTER "{temp_schema}"."{name}" ''' | |
86 | '''USING "{name}_order";''' | |
87 | '''DROP INDEX "{temp_schema}"."{name}_order";''' | |
88 | '''CREATE INDEX ON "{temp_schema}"."{name}" ''' | |
89 | '''USING GIST (way) WITH (fillfactor=100);''') | |
90 | .format(name=self._name, temp_schema=self._temp_schema)) | |
91 | # Reset autovacuum. The table is static, so this doesn't really | |
92 | # matter since it'll never need a vacuum. | |
93 | cur.execute('''ALTER TABLE "{temp_schema}"."{name}" RESET ( autovacuum_enabled );''' | |
94 | .format(name=self._name, temp_schema=self._temp_schema)) | |
95 | self._conn.commit() | |
96 | ||
97 | # VACUUM can't be run in transaction, so autocommit needs to be turned on | |
98 | old_autocommit = self._conn.autocommit | |
99 | try: | |
100 | self._conn.autocommit = True | |
101 | with self._conn.cursor() as cur: | |
102 | cur.execute('''VACUUM ANALYZE "{temp_schema}"."{name}";''' | |
103 | .format(name=self._name, temp_schema=self._temp_schema)) | |
104 | finally: | |
105 | self._conn.autocommit = old_autocommit | |
106 | ||
107 | def replace(self, new_last_modified): | |
108 | with self._conn.cursor() as cur: | |
109 | cur.execute('''BEGIN;''') | |
110 | cur.execute(('''DROP TABLE IF EXISTS "{schema}"."{name}";''' | |
111 | '''ALTER TABLE "{temp_schema}"."{name}" SET SCHEMA "{schema}";''') | |
112 | .format(name=self._name, temp_schema=self._temp_schema, schema=self._dst_schema)) | |
113 | ||
114 | # We checked if the metadata table had this table way up above | |
115 | cur.execute('''SELECT 1 FROM "{schema}"."{metadata_table}" WHERE name = %s''' | |
116 | .format(schema=self._dst_schema, metadata_table=self._metadata_table), | |
117 | [self._name]) | |
118 | if cur.rowcount == 0: | |
119 | cur.execute(('''INSERT INTO "{schema}"."{metadata_table}" ''' | |
120 | '''(name, last_modified) VALUES (%s, %s)''') | |
121 | .format(schema=self._dst_schema, metadata_table=self._metadata_table), | |
122 | [self._name, new_last_modified]) | |
123 | else: | |
124 | cur.execute('''UPDATE "{schema}"."{metadata_table}" SET last_modified = %s WHERE name = %s''' | |
125 | .format(schema=self._dst_schema, metadata_table=self._metadata_table), | |
126 | [new_last_modified, self._name]) | |
127 | self._conn.commit() | |
128 | ||
129 | ||
130 | def main(): | |
131 | # parse options | |
132 | parser = argparse.ArgumentParser(description="Load external data into a database") | |
133 | ||
134 | parser.add_argument("-f", "--force", action="store_true", help="Download new data, even if not required") | |
135 | ||
136 | parser.add_argument("-c", "--config", action="store", default="external-data.yml", | |
137 | help="Name of configuration file (default external-data.yml)") | |
138 | parser.add_argument("-D", "--data", action="store", help="Override data download directory") | |
139 | ||
140 | parser.add_argument("-d", "--database", action="store", help="Override database name to connect to") | |
141 | parser.add_argument("-H", "--host", action="store", | |
142 | help="Override database server host or socket directory") | |
143 | parser.add_argument("-p", "--port", action="store", help="Override database server port") | |
144 | parser.add_argument("-U", "--username", action="store", help="Override database user name") | |
145 | parser.add_argument("-v", "--verbose", action="store_true", help="Be more verbose. Overrides -q") | |
146 | parser.add_argument("-q", "--quiet", action="store_true", help="Only report serious problems") | |
147 | ||
148 | opts = parser.parse_args() | |
149 | ||
150 | if opts.verbose: | |
151 | logging.basicConfig(level=logging.DEBUG) | |
152 | elif opts.quiet: | |
153 | logging.basicConfig(level=logging.WARNING) | |
154 | else: | |
155 | logging.basicConfig(level=logging.INFO) | |
156 | ||
157 | with open(opts.config) as config_file: | |
158 | config = yaml.safe_load(config_file) | |
159 | data_dir = opts.data or config["settings"]["data_dir"] | |
160 | os.makedirs(data_dir, exist_ok=True) | |
161 | ||
162 | # If the DB options are unspecified in both on the command line and in the | |
163 | # config file, libpq will pick what to use with the None | |
164 | database = opts.database or config["settings"].get("database") | |
165 | host = opts.host or config["settings"].get("host") | |
166 | port = opts.port or config["settings"].get("port") | |
167 | user = opts.username or config["settings"].get("username") | |
168 | with requests.Session() as s, \ | |
169 | psycopg2.connect(database=database, | |
170 | host=host, port=port, | |
171 | user=user) as conn: | |
172 | ||
173 | s.headers.update({'User-Agent': 'get-external-data.py/osm-carto'}) | |
174 | ||
175 | # DB setup | |
176 | database_setup(conn, config["settings"]["temp_schema"], | |
177 | config["settings"]["schema"], | |
178 | config["settings"]["metadata_table"]) | |
179 | ||
180 | for name, source in config["sources"].items(): | |
181 | logging.info("Checking table {}".format(name)) | |
182 | # Don't attempt to handle strange names | |
183 | # Even if there was code to escape them properly here, you don't want | |
184 | # in a style with all the quoting headaches | |
185 | if not re.match('''^[a-zA-Z0-9_]+$''', name): | |
186 | raise RuntimeError("Only ASCII alphanumeric table are names supported") | |
187 | ||
188 | workingdir = os.path.join(data_dir, name) | |
189 | # Clean up anything left over from an aborted run | |
190 | shutil.rmtree(workingdir, ignore_errors=True) | |
191 | ||
192 | os.makedirs(workingdir, exist_ok=True) | |
193 | ||
194 | this_table = Table(name, conn, | |
195 | config["settings"]["temp_schema"], | |
196 | config["settings"]["schema"], | |
197 | config["settings"]["metadata_table"]) | |
198 | this_table.clean_temp() | |
199 | ||
200 | if not opts.force: | |
201 | headers = {'If-Modified-Since': this_table.last_modified()} | |
202 | else: | |
203 | headers = {} | |
204 | ||
205 | download = s.get(source["url"], headers=headers) | |
206 | download.raise_for_status() | |
207 | ||
208 | if (download.status_code == 200): | |
209 | if "Last-Modified" in download.headers: | |
210 | new_last_modified = download.headers["Last-Modified"] | |
211 | else: | |
212 | new_last_modified = None | |
213 | if "archive" in source and source["archive"]["format"] == "zip": | |
214 | zip = zipfile.ZipFile(io.BytesIO(download.content)) | |
215 | for member in source["archive"]["files"]: | |
216 | zip.extract(member, workingdir) | |
217 | ||
218 | ogrpg = "PG:dbname={}".format(database) | |
219 | ||
220 | if port is not None: | |
221 | ogrpg = ogrpg + " port={}".format(port) | |
222 | if user is not None: | |
223 | ogrpg = ogrpg + " user={}".format(user) | |
224 | if host is not None: | |
225 | ogrpg = ogrpg + " host={}".format(host) | |
226 | ||
227 | ogrcommand = ["ogr2ogr", | |
228 | '-f', 'PostgreSQL', | |
229 | '-lco', 'GEOMETRY_NAME=way', | |
230 | '-lco', 'SPATIAL_INDEX=FALSE', | |
231 | '-lco', 'EXTRACT_SCHEMA_FROM_LAYER_NAME=YES', | |
232 | '-nln', "{}.{}".format(config["settings"]["temp_schema"], name)] | |
233 | ||
234 | if "ogropts" in source: | |
235 | ogrcommand += source["ogropts"] | |
236 | ||
237 | ogrcommand += [ogrpg, os.path.join(workingdir, source["file"])] | |
238 | ||
239 | logging.debug("running {}".format(subprocess.list2cmdline(ogrcommand))) | |
240 | ||
241 | # ogr2ogr can raise errors here, so they need to be caught | |
242 | try: | |
243 | subprocess.check_output(ogrcommand, stderr=subprocess.PIPE, universal_newlines=True) | |
244 | except subprocess.CalledProcessError as e: | |
245 | # Add more detail on stdout for the logs | |
246 | logging.critical("ogr2ogr returned {} with layer {}".format(e.returncode, name)) | |
247 | logging.critical("Command line was {}".format(subprocess.list2cmdline(e.cmd))) | |
248 | logging.critical("Output was\n{}".format(e.output)) | |
249 | raise RuntimeError("ogr2ogr error when loading table {}".format(name)) | |
250 | ||
251 | this_table.index() | |
252 | this_table.replace(new_last_modified) | |
253 | else: | |
254 | logging.info("Table {} did not require updating".format(name)) | |
255 | ||
256 | ||
257 | if __name__ == '__main__': | |
258 | main() |