Thu, 29 Mar 2018 23:55:36 +0300
updates
#!/usr/bin/env python3 from flask import Flask, render_template, abort, send_from_directory, redirect from datetime import datetime, date, time, timedelta from os import path, listdir, environ from configparser import ConfigParser import locale app = Flask(__name__) from misc import * from busroute import reduce_schedule import buses suffix_regions = {'naantalin pikatie', 'helsingin valtatie', 'kansanpuisto'} # Varmista ettei järjestelmän kieliasetukset sotke muotoiluja def reset_locale(): locale.setlocale(locale.LC_ALL, locale.getdefaultlocale()) def activate_locale(language = None): language = language or language_for_page() class result: def __enter__(self): locale.setlocale(locale.LC_ALL, tr('locale', 'other', language = language)) def __exit__(self, *args): reset_locale() return result() reset_locale() # Lataa käännökset class Translator: def __init__(self): self.languages = {} def load_language(self, file_path): language_name = path.splitext(path.basename(file_path))[0] ini = ConfigParser() ini.read(path.join(file_path)) self.languages[language_name] = ini def __call__(self, name, *sections, language = None): language = language or language_for_page() for section in sections: try: return self.languages[language][section][name] except KeyError: try: return profile['tr:' + language + ':' + section][name] except KeyError: pass else: return name[:1].upper() + name[1:] tr = Translator() for file in listdir('tr'): tr.load_language(path.join('tr', file)) def language_for_page(): from flask import request if request.args.get('kääntämätön') is not None: return None else: for language_name in tr.languages: if request.args.get(language_name) is not None: return language_name else: return request.accept_languages.best_match(tr.languages) def sign_elements(schedule_entry, format = 'medium'): from math import ceil from busroute import greatly_simplify_name trip_length = schedule_entry['trip'].length - schedule_entry['stop'].traveled_distance regions = schedule_entry['trip'].concise_schedule(schedule_entry['stop']) if format == 'short': regions = [greatly_simplify_name(region) for region in regions] return reduce_schedule( regions, trip_length = trip_length, format = format) def sign(schedule_entry, format = 'medium'): sign = sign_elements(schedule_entry, format = format) if sign: sign_representation = ' - '.join(tr(place, 'places') for place in sign if place not in suffix_regions) sign_representation += ''.join(' ' + tr(place, 'suffix-places') for place in sign if place in suffix_regions) return sign_representation else: return schedule_entry['trip'].schedule[-1].stop.name def long_form_sign(schedule_entry, format = 'long'): from math import ceil trip_length = schedule_entry['trip'].length - schedule_entry['stop'].traveled_distance sign = reduce_schedule(schedule_entry['trip'].concise_schedule(schedule_entry['stop']), trip_length = trip_length, format = format) if sign: return { 'destination': tr(sign[-1], 'places'), 'via': [tr(place, 'places') for place in sign[:-1]], } else: return { 'destination': schedule_entry['trip'].schedule[-1].stop.name, 'via': [], } def imminent(schedule_entry): return (schedule_entry['time'] - now()) <= timedelta(minutes = 3) def first_halt_in_trip_in_place(trip, place): for halt in trip.schedule: if halt.stop.region == place: return halt else: return None place_abbreviations = ConfigParser() place_abbreviations.read('abbreviations.ini') def place_abbreviation(place): try: return place_abbreviations['abbreviations'][place] except KeyError: return place def trip_abbreviation(trip): entries = [trip.from_place] old_places = None starting_halt = None while True: remaining_length = trip.length if starting_halt: remaining_length -= starting_halt.traveled_distance places = reduce_schedule(trip.concise_schedule(starting_stop = starting_halt), trip_length = remaining_length, format = 'medium') new_places = set(places) - set(entries) if not new_places or places == old_places: break for place in places: if place in new_places: starting_halt = first_halt_in_trip_in_place(trip, place) entries += [place] break old_places = places if trip.to_place not in entries: entries += [trip.to_place] return trip.route.reference + ':' + '-'.join(map(place_abbreviation, entries)) def split_route_ref(route_ref): try: return list(parse_route_ref(route_ref)) except ValueError: return ['', route_ref, ''] @app.route('/stop/<reference>') def bus_stop_schedule(reference): from buses import bus_stops schedule = [] try: bus_stop = bus_stops[reference] except KeyError: abort(404) for schedule_entry in bus_stop.schedule(max_amount = 100, arrivals = True): route_ref = schedule_entry['trip'].route.reference schedule.append({ 'time': time_representation(schedule_entry['time']), 'route': route_ref, 'route-splice': split_route_ref(route_ref), 'sign': sign(schedule_entry), 'trip': schedule_entry['stop'].trip.name, 'night': is_night_time(schedule_entry['time']), 'imminent': imminent(schedule_entry), }) return render_template( 'stop.html', schedule = schedule, name = bus_stop.code + ' ' + tr(bus_stop.name, 'bus-stops', 'places'), link_to_map = bus_stop.location.link_to_map, region = hasattr(bus_stop, 'region') and bus_stop.region or None, location = bus_stop.location, cluster = bus_stop.cluster.url_name if len(bus_stop.cluster.stops) > 1 else None, tr = tr, ) def week_schedule(bus_stop, **kwargs): for i in range(-1, 7): yield from bus_stop.schedule_for_day(today() + timedelta(i), **kwargs) def route_key(route): match = re.search(r'^([a-zA-Z]*)(\d+)(.*)$', route) if match: groups = match.groups() return (groups[0], int(groups[1]), groups[2]) else: return (route,) def parse_route_ref(route_ref): from re import search match = search(r'^([^0-9]*)([0-9]+)(.*)$', route_ref) try: return match.group(1), int(match.group(2)), match.group(3) except AttributeError: raise ValueError(route_ref) def condense_route_list(route_list): def prepare_range_pool(range_pool): if len(range_pool) < 3: yield from map(str, range_pool) else: yield str(min(range_pool)) + '-' + str(max(range_pool)) range_pool = [] for route in route_list: try: route_int = int(route) except ValueError: yield from prepare_range_pool(range_pool) range_pool = [] yield route else: if not range_pool or route_int - 1 in range_pool: range_pool.append(route_int) else: yield from prepare_range_pool(range_pool) range_pool = [route_int] if range_pool: yield from prepare_range_pool(range_pool) def is_weekend_night(time): from datetime import timedelta adjusted_time = time - timedelta(hours = 4, minutes = 30) return adjusted_time.weekday() in [4, 5] and is_night_time(time) def describe(bus_stop): schedule = [] from collections import defaultdict, Counter from busroute import greatly_simplify_name destinations_per_route = defaultdict(Counter) def route_key(route_ref): try: return parse_route_ref(route_ref) except ValueError: return () def filter_names(names): if len(names) == 1 and names[0] == (bus_stop.region and greatly_simplify_name(bus_stop.region)): return type(names)() else: return names data = [] names = [] from collections import defaultdict night_routes = defaultdict(lambda: True) num_leaves = 0 all_routes = Counter() for schedule_entry in week_schedule(bus_stop, arrivals = True): #bus_stop.schedule(max_amount = 500, arrivals = True): sign_tuple = tuple(sign_elements(schedule_entry, format = 'short')) route = schedule_entry['trip'].route.reference night_routes[route] &= is_weekend_night(schedule_entry['time']) destinations_per_route[route][sign_tuple] += 1 all_routes[route] += 1 num_leaves += 1 night_routes = {key for key, value in night_routes.items() if value} routes_per_destination = defaultdict(set) for route in destinations_per_route: winner, count = destinations_per_route[route].most_common()[0] if all_routes[route] >= 10 or all_routes[route] / num_leaves >= 0.01: winner = filter_names(winner) #destinations_per_route[route] = winner and ' - '.join(winner) or '' routes_per_destination[winner].add(route) for key in routes_per_destination: routes_per_destination[key] = sorted(routes_per_destination[key], key = route_key) def route_len(route): length = 0 for char in route: if char.isdigit(): length += 1 else: break return length or len(route) from math import inf def route_key(route): return (route in night_routes, route_len(route), str(route)) def routes_key(routes): return min(route_key(route) for route in routes) result = [] for regions, routes in sorted( routes_per_destination.items(), key = lambda pair: routes_key(pair[1]) ): result.append(( list(condense_route_list(sorted(routes, key = route_key))), ' - '.join(tr(region, 'regions') for region in regions) )) return { 'night-routes': night_routes, 'all-night-routes': lambda entry, description: all(route in description['night-routes'] for route in entry[0]), 'simple': len(all_routes) <= 1, 'description': result, 'wtf': destinations_per_route, } @app.route('/stop_description/<reference>') def bus_stop_description(reference): from buses import bus_stops from pprint import pformat try: bus_stop = bus_stops[reference] except KeyError: abort(404) return '<pre>' + pformat(describe(bus_stop)) + '</pre>' @app.route('/api/describe_destination/<trip_reference>/<stop_reference>/<int:index>') def describe_destination(trip_reference, stop_reference, index): from buses import bus_stops, all_trips from busroute import simplify_name from flask import jsonify try: trip = all_trips[trip_reference] bus_stop = bus_stops[stop] schedule_entry = [schedule_entry for schedule_entry in trip.schedule if schedule_entry.stop.reference == stop_reference][index] except KeyError: abort(404) except ValueError: abort(404) return jsonify(long_form_sign({'trip': trip, 'stop': schedule_entry})) @app.route('/api/trip_abbreviation/<block_id>/<int:arrival_time_offset>') def api_trip_abbreviation(block_id, arrival_time_offset): from buses import trips_by_vehicle_info from flask import jsonify from datetime import timedelta try: trip = trips_by_vehicle_info[block_id, timedelta(seconds = arrival_time_offset)] except KeyError: abort(404) return jsonify({ 'abbreviation': trip_abbreviation(trip), }) def current_bus_day(): from datetime import date, datetime, timedelta day = date.today() if datetime.now().hour < 5: day -= timedelta(1) return day @app.route('/stop_display/<reference>') def stop_display(reference): from buses import bus_stops schedule = [] try: bus_stop = bus_stops[reference] except KeyError: abort(404) for i, schedule_entry in enumerate(bus_stop.schedule_for_day(current_bus_day(), arrivals = False)): schedule.append({ 'time_data': schedule_entry['time'], 'time': time_representation(schedule_entry['time']), 'route': schedule_entry['trip'].route.reference, 'sign': long_form_sign(schedule_entry, format = 'medium'), 'trip': schedule_entry['stop'].trip.name, 'night': is_night_time(schedule_entry['time']), 'imminent': imminent(schedule_entry), 'index': i, }) from pprint import pprint pprint(schedule) def destination_key(schedule_entry): sign = schedule_entry['sign'] return (sign['destination'],) + tuple(sign['via']) def filter_schedule(schedule, *, key): used = set() for schedule_entry in schedule: key_value = key(schedule_entry) if key_value not in used: used.add(key_value) yield schedule_entry schedule = list(filter_schedule(schedule, key = destination_key))[:6] if schedule: num_imminent_leaves = max(1, len([schedule_entry for schedule_entry in schedule if schedule_entry['time_data'] - schedule[0]['time_data'] < timedelta(minutes = 3)])) else: num_imminent_leaves = 1 return render_template( 'stop_display.html', schedule = schedule, ref = bus_stop.code, name = tr(bus_stop.name, 'bus-stops'), link_to_map = bus_stop.location.link_to_map, region = bus_stop.region, location = bus_stop.location, cluster = bus_stop.cluster.url_name if len(bus_stop.cluster.stops) > 1 else None, num_imminent_leaves = num_imminent_leaves, tr = tr, ) @app.route('/test') def test(): from buses import bus_stops bus_stop = bus_stops['16'] schedule = [{'imminent': True, 'index': 0, 'night': False, 'route': '2A', 'sign': {'destination': 'Kohmo', 'via': ['Nummenmäki', 'Kurala']}, 'time': '1m', 'trip': '00012501__3798generatedBlock'}, {'imminent': True, 'index': 1, 'night': False, 'route': '54', 'sign': {'destination': 'Ylioppilaskylä', 'via': []}, 'time': '2m', 'trip': '00014359__5656generatedBlock'}, {'imminent': True, 'index': 2, 'night': False, 'route': '1', 'sign': {'destination': 'Lentoasema ✈', 'via': ['Urusvuori']}, 'time': '3m', 'trip': '00010281__1281generatedBlock'}, {'imminent': False, 'index': 3, 'night': False, 'route': '56', 'sign': {'destination': 'Räntämäki', 'via': ['Nummenmäki', 'Halinen']}, 'time': '8m', 'trip': '00014686__5983generatedBlock'}, {'imminent': False, 'index': 4, 'night': False, 'route': '42', 'sign': {'destination': 'Varissuo', 'via': ['Kupittaa as', 'Itäharju']}, 'time': '18:30', 'trip': '00014010__5307generatedBlock'}, {'imminent': False, 'index': 5, 'night': False, 'route': '2B', 'sign': {'destination': 'Littoinen', 'via': ['Nummenmäki', 'Kurala', 'Kohmo']}, 'time': '18:35', 'trip': '00012629__3926generatedBlock'}] return render_template( 'stop_display.html', schedule = schedule, ref = bus_stop.code, name = tr(bus_stop.name, 'bus-stops'), link_to_map = bus_stop.location.link_to_map, region = bus_stop.region, location = bus_stop.location, cluster = bus_stop.cluster.url_name if len(bus_stop.cluster.stops) > 1 else None, num_imminent_leaves = max(1, sum(schedule_entry['imminent'] for schedule_entry in schedule)), tr = tr, ) def time_representation(time, relative = True): time_difference = time - now() if relative and timedelta(minutes = -1) < time_difference < timedelta(minutes = 1): return tr('right-now', 'misc-text') elif relative and time_difference > timedelta(0) and time_difference < timedelta(minutes = 10): return '%dm' % round(time_difference.seconds / 60) elif time.date() == today(): return '%d:%02d' % (time.hour, time.minute) elif time_difference < timedelta(7): with activate_locale(): return time.strftime('%-a %H:%M').replace(' ', '\xa0') else: with activate_locale(): return time.strftime('%-d.%-m. %H:%M').replace(' ', '\xa0') @app.route('/stop_cluster/<cluster_name>') def cluster_schedule(cluster_name): from buses import bus_stops, clusters_by_name schedule = [] try: cluster = clusters_by_name[cluster_name] except KeyError: abort(404) for schedule_entry in cluster.schedule(max_amount = 100): schedule.append({ 'time': time_representation(schedule_entry['time']), 'route': schedule_entry['trip'].route.reference, 'route-splice': split_route_ref(schedule_entry['trip'].route.reference), 'sign': sign(schedule_entry), 'trip': schedule_entry['stop'].trip.name, 'night': is_night_time(schedule_entry['time']), 'stop_id': schedule_entry['stop'].stop.reference, 'stop_code': schedule_entry['stop'].stop.code, 'stop_name': tr(schedule_entry['stop'].stop.name, 'bus-stops', 'places'), 'imminent': imminent(schedule_entry), }) stops_in_cluster = sorted( ({ 'id': stop.reference, 'code': stop.code, 'name': tr(stop.name, 'bus-stops'), } for stop in cluster.stops), key = lambda stop: (len(stop['id']), stop['id']) ) return render_template( 'cluster.html', schedule = schedule, name = tr(cluster.name, 'places', 'place-clusters', 'bus-stops'), link_to_map = cluster.center.link_to_map, location = cluster.center, stops_in_cluster = stops_in_cluster, amount_of_stops_in_cluster = len(stops_in_cluster), tr = tr, ) def day_class(weekday): if weekday < 5: return 'working-day' elif weekday == 5: return 'saturday' else: assert weekday == 6 return 'sunday' @app.route('/stop_week/<stop_reference>') def stop_week(stop_reference): from buses import bus_stops from flask import request if 'routes' in request.args: filtered_routes = set(request.args['routes'].split(';')) route_filter = lambda route: route in filtered_routes else: route_filter = lambda route: True if 'dest' in request.args: dests = {bus_stops.get(dest, None) for dest in request.args['dest'].split(';')} dests.discard(None) dest_filter = lambda trip: any(trip.contains_stop(dest) for dest in dests) else: dest_filter = lambda trip: True schedule = [] try: bus_stop = bus_stops[stop_reference] except KeyError: abort(404) week_model = {} for schedule_entry in week_schedule(bus_stop, arrivals = True): route_ref = schedule_entry['trip'].route.reference if route_filter(route_ref) and dest_filter(schedule_entry['trip']): time = schedule_entry['time'] date = schedule_entry['date'] if date not in week_model: week_model[date] = {} day_model = week_model[date] if time.hour not in day_model: day_model[time.hour] = [] hour_model = day_model[time.hour] hour_model.append({ 'route': route_ref, 'route-splice': split_route_ref(route_ref), 'trip': schedule_entry['stop'].trip.name, 'night': is_night_time(schedule_entry['time']), 'minute': time.minute, }) for day_offset in range(7): from datetime import date, datetime, timedelta day = date.today() + timedelta(day_offset) try: day_model = week_model[day] except KeyError: week_model[day] = {} else: def hour_key(x): return (x - 5) % 24 # Fill in missing hours from 5am to last active hour hours = set(day_model.keys()) | {5} sorted_hours = sorted(hours, key = hour_key) start_hour = sorted_hours[0] end_hour = sorted_hours[-1] + 1 for hour in range(start_hour, end_hour): hour_start = datetime(day.year, day.month, day.day, hour, 0) if hour not in day_model and hour_start >= datetime.now(): day_model[hour] = [] # Sort the hours, so that 5am is first and 4am is last. from collections import OrderedDict week_model[day] = OrderedDict( sorted( day_model.items(), key = lambda pair: hour_key(pair[0]), ) ) week_model = [ { 'day': day, 'schedule': schedule, 'day-class': day_class(day.weekday()) } for day, schedule in week_model.items() ] week_model = sorted(week_model, key = lambda day: day['day']) return render_template( 'stop_week.html', ref = bus_stop.code, name = tr(bus_stop.name, 'bus-stops'), tr = tr, week = week_model, description = describe(bus_stop), ) @app.route('/trip/<trip_reference>') def trip(trip_reference): from flask import request from buses import all_trips from busroute import simplify_name try: trip = all_trips[trip_reference] except KeyError: abort(404) schedule = [] region = '' for halt in trip.schedule: stop_time = datetime.combine(today(), time()) + halt.arrival_time formatted_time = time_representation(stop_time) if profile['regions']['use-regions']: if halt.stop.region != region and not (region and not halt.stop.region): if len(schedule) and not schedule[-1]['name']: schedule[-1]['name'] = tr(halt.stop.region or '', 'places') else: schedule.append({ 'name': tr(halt.stop.region or '', 'places'), 'time': formatted_time, 'stops': [], 'index': len(schedule), }) region = halt.stop.region else: schedule.append({ 'name': tr(halt.stop.name or '', 'bus-stops', 'places'), 'time': formatted_time, 'stops': [], 'index': len(schedule), }) schedule[-1]['stops'].append({ 'time': formatted_time, 'id': halt.stop.reference, 'code': halt.stop.code, 'name': tr(halt.stop.name, 'bus-stops', 'places'), }) sign = trip.concise_schedule() try: sign = [simplify_name(sign[0]), simplify_name(sign[-1])] except IndexError: sign = [trip.schedule[0].stop.name, trip.schedule[-1].stop.name] return render_template('trip.html', schedule = schedule, trip_reference = trip_reference, route = trip.route.reference, description = ' - '.join(tr(place, 'places') for place in sign), night = is_night_time(datetime.combine(today(), time()) + trip.schedule[-1].arrival_time), tr = tr, length = trip.length / 1000, ) @app.route('/route/<name>') def route_page(name): from buses import routes route = routes[name.upper()] schedule = [] for trip in route.trips: if trip.is_served_at(today()) and datetime.combine(today(), time()) + trip.schedule[-1].arrival_time < now(): schedule.append({ 'name': trip.reference, 'from': trip.from_place, 'to': trip.to_place, 'time': time_representation(datetime.combine(today(), time()) + trip.schedule[0].departure_time), }) return render_template('route.html', name = route.reference + ' ' + route.description, tr = tr, schedule = schedule, ) @app.route('/') def index(): return redirect('stop_cluster/kauppatori') @app.route('/pysäkki/<reference>') def redirect_pysäkki(reference): return redirect('stop/' + str(reference)) @app.route('/pysäkkiryhmä/<reference>') def redirect_pysäkkiryhmä(reference): return redirect('stop_cluster/' + str(reference)) @app.route('/ajovuoro/<reference>') def redirect_ajovuoro(reference): return redirect('trip/' + str(reference)) @app.route('/static/<path:path>') def static_file(path): return send_from_directory(path.join('static', path)) from argparse import ArgumentParser parser = ArgumentParser() parser.add_argument('gtfs_zip_path') parser.add_argument('profile_path') if __name__ == '__main__': parser.add_argument('-p', '--port', type = int, default = 5000) parser.add_argument('-d', '--debug', action = 'store_true') args = parser.parse_args() profile.read(args.profile_path) buses.load_buses(args.gtfs_zip_path) if __name__ == '__main__': app.run(debug = args.debug, port = args.port)