itienary_processing.py

changeset 3
10ce28475e9c
parent 2
7378b802ddf8
child 4
ac067a42b00f
equal deleted inserted replaced
2:7378b802ddf8 3:10ce28475e9c
1 #!/usr/bin/env python3 1 #!/usr/bin/env python3
2 import sqlalchemy 2 import sqlalchemy
3 import sqlalchemy.orm 3 import sqlalchemy.orm
4 import multiprocessing
4 from regions import parse_regions 5 from regions import parse_regions
5 from datamodel import * 6 from datamodel import *
6 engine = sqlalchemy.create_engine('sqlite:///gtfs.db') 7 engine = sqlalchemy.create_engine('sqlite:///gtfs.db')
7 GtfsBase.metadata.create_all(engine) 8 GtfsBase.metadata.create_all(engine)
8 session = sqlalchemy.orm.sessionmaker(bind = engine)() 9 session = sqlalchemy.orm.sessionmaker(bind = engine)()
22 .limit(1000) 23 .limit(1000)
23 24
24 def length_left(stoptime): 25 def length_left(stoptime):
25 return stoptime.trip.shape.length - stoptime.shape_distance_traveled 26 return stoptime.trip.shape.length - stoptime.shape_distance_traveled
26 27
28 from datetime import datetime, timedelta
29 time_in_db = timedelta(0)
30 time_in_process = timedelta(0)
27 amount = session.query(GtfsStopTime).filter(GtfsStopTime.destination == None).count() 31 amount = session.query(GtfsStopTime).filter(GtfsStopTime.destination == None).count()
28 k = 0 32 k = 0
29 got_stoptimes = amount != 0 33 got_stoptimes = amount != 0
34 from threading import Thread
35
36 def get_filtered_itinerary(stoptime):
37 return list(filter_itinerary(
38 entry.stop.stop_region
39 for entry in session.query(GtfsStopTime)
40 .filter(GtfsStopTime.trip_id == stoptime.trip_id)
41 .filter(GtfsStopTime.stop_sequence > stoptime.stop_sequence)
42 ))
43
44 def get_destination(stoptime, itinerary):
45 from busroute import destinations_list
46 dests = destinations_list(
47 itinerary = itinerary,
48 trip_length = float(length_left(stoptime)),
49 regions = regions,
50 )
51 return '-'.join(dests)
30 52
31 while got_stoptimes: 53 while got_stoptimes:
32 print('%.2f%%' % (k * 100 / amount)) 54 if k > 0:
55 print('%.2f%% done' % (k * 100 / amount))
56 #print('%s spent in query, %s spent in processing (%.1f%% of time spent processing)' % (time_in_db, time_in_process, (time_in_process / (time_in_db + time_in_process) * 100)))
33 got_stoptimes = False 57 got_stoptimes = False
34 for stoptime in get_stoptimes(session): 58 stoptimes = list(get_stoptimes(session))
59 with multiprocessing.Pool(12) as p:
60 itineraries = list(p.map(get_filtered_itinerary, stoptimes))
61 for stoptime, itinerary in zip(stoptimes, itineraries):
35 got_stoptimes = True 62 got_stoptimes = True
63 destination = get_destination(stoptime, itinerary)
64 stoptime.destination = destination
36 k += 1 65 k += 1
37 itinerary = list(filter_itinerary(
38 entry.stop.stop_region
39 for entry in session.query(GtfsStopTime)
40 .filter(GtfsStopTime.trip_id == stoptime.trip_id)
41 .filter(GtfsStopTime.stop_sequence > stoptime.stop_sequence)
42 ))
43 from busroute import destinations_list
44 dests = destinations_list(
45 itinerary = itinerary,
46 trip_length = float(length_left(stoptime)),
47 regions = regions,
48 )
49 stoptime.destination = '-'.join(dests)
50 session.commit() 66 session.commit()

mercurial