Mon, 05 Mar 2018 12:36:53 +0200
add replacement
#!/usr/bin/env python3 from flask import Flask, render_template, abort, send_from_directory, redirect from datetime import datetime, date, time, timedelta from os import path, listdir from configparser import ConfigParser import locale from misc import * from busroute import reduce_schedule import buses app = Flask(__name__) suffix_regions = {'naantalin pikatie', 'helsingin valtatie', 'kansanpuisto'} # Varmista ettei järjestelmän kieliasetukset sotke muotoiluja def reset_locale(): locale.setlocale(locale.LC_ALL, locale.getdefaultlocale()) def activate_locale(language = None): language = language or language_for_page() class result: def __enter__(self): locale.setlocale(locale.LC_ALL, tr('locale', 'other', language = language)) def __exit__(self, *args): reset_locale() return result() reset_locale() # Lataa käännökset class Translator: def __init__(self): self.languages = {} def load_language(self, file_path): language_name = path.splitext(path.basename(file_path))[0] ini = ConfigParser() ini.read(path.join(file_path)) self.languages[language_name] = ini def __call__(self, name, *sections, language = None): language = language or language_for_page() for section in sections: try: return self.languages[language][section][name] except KeyError: try: return profile['tr:' + language + ':' + section][name] except KeyError: pass else: return name[:1].upper() + name[1:] tr = Translator() for file in listdir('tr'): tr.load_language(path.join('tr', file)) def language_for_page(): from flask import request if request.args.get('kääntämätön') is not None: return None else: for language_name in tr.languages: if request.args.get(language_name) is not None: return language_name else: return request.accept_languages.best_match(tr.languages) def sign_elements(schedule_entry, long = False): from math import ceil trip_length = schedule_entry['trip'].length - schedule_entry['stop'].traveled_distance return reduce_schedule( schedule_entry['trip'].concise_schedule(schedule_entry['stop']), trip_length = trip_length, long = long) def sign(schedule_entry, long = False): sign = sign_elements(schedule_entry, long = long) if sign: sign_representation = ' - '.join(tr(place, 'places') for place in sign if place not in suffix_regions) sign_representation += ''.join(' ' + tr(place, 'suffix-places') for place in sign if place in suffix_regions) return sign_representation else: return schedule_entry['trip'].schedule[-1].stop.name def long_form_sign(schedule_entry): from math import ceil trip_length = schedule_entry['trip'].length - schedule_entry['stop'].traveled_distance sign = reduce_schedule(schedule_entry['trip'].concise_schedule(schedule_entry['stop']), trip_length = trip_length, long = True) if sign: return { 'destination': tr(sign[-1], 'places'), 'via': [tr(place, 'places') for place in sign[:-1]], } else: return { 'destination': schedule_entry['trip'].schedule[-1].stop.name, 'via': [], } def imminent(schedule_entry): return (schedule_entry['time'] - now()) <= timedelta(minutes = 3) def first_halt_in_trip_in_place(trip, place): for halt in trip.schedule: if halt.stop.region == place: return halt else: return None place_abbreviations = ConfigParser() place_abbreviations.read('abbreviations.ini') def place_abbreviation(place): try: return place_abbreviations['abbreviations'][place] except KeyError: return place def trip_abbreviation(trip): entries = [trip.from_place] old_places = None starting_halt = None while True: remaining_length = trip.length if starting_halt: remaining_length -= starting_halt.traveled_distance places = reduce_schedule(trip.concise_schedule(starting_stop = starting_halt), trip_length = remaining_length, long = False) new_places = set(places) - set(entries) if not new_places or places == old_places: break for place in places: if place in new_places: starting_halt = first_halt_in_trip_in_place(trip, place) entries += [place] break old_places = places if trip.to_place not in entries: entries += [trip.to_place] return trip.route.reference + ':' + '-'.join(map(place_abbreviation, entries)) def split_route_ref(route_ref): try: return list(parse_route_ref(route_ref)) except ValueError: return ['', route_ref, ''] @app.route('/stop/<reference>') def bus_stop_schedule(reference): from buses import bus_stops schedule = [] try: bus_stop = bus_stops[reference] except KeyError: abort(404) for schedule_entry in bus_stop.schedule(max_amount = 100, arrivals = True): route_ref = schedule_entry['trip'].route.reference schedule.append({ 'time': time_representation(schedule_entry['time']), 'route': route_ref, 'route-splice': split_route_ref(route_ref), 'sign': sign(schedule_entry), 'trip': schedule_entry['stop'].trip.name, 'night': is_night_time(schedule_entry['time']), 'imminent': imminent(schedule_entry), }) return render_template( 'stop.html', schedule = schedule, name = bus_stop.code + ' ' + tr(bus_stop.name, 'bus-stops', 'places'), link_to_map = bus_stop.location.link_to_map, region = hasattr(bus_stop, 'region') and bus_stop.region or None, location = bus_stop.location, cluster = bus_stop.cluster.url_name if len(bus_stop.cluster.stops) > 1 else None, tr = tr, ) def week_schedule(bus_stop, **kwargs): for i in range(-1, 7): yield from bus_stop.schedule_for_day(today() + timedelta(i), **kwargs) def route_key(route): match = re.search(r'^([a-zA-Z]*)(\d+)(.*)$', route) if match: groups = match.groups() return (groups[0], int(groups[1]), groups[2]) else: return (route,) def parse_route_ref(route_ref): from re import search match = search(r'^([^0-9]*)([0-9]+)(.*)$', route_ref) try: return match.group(1), int(match.group(2)), match.group(3) except AttributeError: raise ValueError(route_ref) @app.route('/stop_description/<reference>') def bus_stop_description(reference): from buses import bus_stops schedule = [] try: bus_stop = bus_stops[reference] except KeyError: abort(404) from collections import defaultdict, Counter from busroute import simplify_name destinations_per_route = defaultdict(Counter) def route_key(route_ref): try: return parse_route_ref(route_ref) except ValueError: return () def filter_names(names): if len(names) == 1 and names[0] == (bus_stop.region and simplify_name(bus_stop.region)): return type(names)() else: return names data = [] names = [] from collections import defaultdict night_routes = defaultdict(lambda: True) num_leaves = 0 for schedule_entry in week_schedule(bus_stop, arrivals = True): #bus_stop.schedule(max_amount = 500, arrivals = True): sign_tuple = tuple(sign_elements(schedule_entry)) night_routes[schedule_entry['trip'].route.reference] &= is_night_time(schedule_entry['time']) #for entry in sign_tuple: # if entry not in names: # names.append(entry) #sign_tuple = tuple(names.index(place) for place in sign_tuple) destinations_per_route[schedule_entry['trip'].route.reference][sign_tuple] += 1 num_leaves += 1 night_routes = {key for key, value in night_routes.items() if value} routes_per_destination = defaultdict(set) for route in destinations_per_route: winner, count = destinations_per_route[route].most_common()[0] if count >= 10 or count / num_leaves >= 0.01: winner = filter_names(winner) #destinations_per_route[route] = winner and ' - '.join(winner) or '' routes_per_destination[winner].add(route) for key in routes_per_destination: routes_per_destination[key] = sorted(routes_per_destination[key], key = route_key) from pprint import pformat return '<pre>' + \ 'names: ' + str(names) + '\n '+ \ 'night_routes: ' + str(night_routes) + '\n '+ \ 'destinations_per_route: ' + pformat(dict(destinations_per_route)) + '\n' + \ 'routes_per_destination: ' + pformat(dict(routes_per_destination)) + '</pre>' @app.route('/api/describe_destination/<trip_reference>/<stop_reference>/<int:index>') def describe_destination(trip_reference, stop_reference, index): from buses import bus_stops, all_trips from busroute import simplify_name from flask import jsonify try: trip = all_trips[trip_reference] bus_stop = bus_stops[stop] schedule_entry = [schedule_entry for schedule_entry in trip.schedule if schedule_entry.stop.reference == stop_reference][index] except KeyError: abort(404) except ValueError: abort(404) return jsonify(long_form_sign({'trip': trip, 'stop': schedule_entry})) @app.route('/api/trip_abbreviation/<block_id>/<int:arrival_time_offset>') def api_trip_abbreviation(block_id, arrival_time_offset): from buses import trips_by_vehicle_info from flask import jsonify from datetime import timedelta try: trip = trips_by_vehicle_info[block_id, timedelta(seconds = arrival_time_offset)] except KeyError: abort(404) return jsonify({ 'abbreviation': trip_abbreviation(trip), }) @app.route('/stop_display/<reference>') def stop_display(reference): from buses import bus_stops schedule = [] try: bus_stop = bus_stops[reference] except KeyError: abort(404) for i, schedule_entry in enumerate(bus_stop.schedule_for_day(today(), arrivals = False)): schedule.append({ 'time_data': schedule_entry['time'], 'time': time_representation(schedule_entry['time']), 'route': schedule_entry['trip'].route.reference, 'sign': long_form_sign(schedule_entry), 'trip': schedule_entry['stop'].trip.name, 'night': is_night_time(schedule_entry['time']), 'imminent': imminent(schedule_entry), 'index': i, }) from pprint import pprint pprint(schedule) def destination_key(schedule_entry): sign = schedule_entry['sign'] return (sign['destination'],) + tuple(sign['via']) def filter_schedule(schedule, *, key): used = set() for schedule_entry in schedule: key_value = key(schedule_entry) if key_value not in used: used.add(key_value) yield schedule_entry schedule = list(filter_schedule(schedule, key = destination_key))[:6] if schedule: num_imminent_leaves = max(1, len([schedule_entry for schedule_entry in schedule if schedule_entry['time_data'] - schedule[0]['time_data'] < timedelta(minutes = 3)])) else: num_imminent_leaves = 1 return render_template( 'stop_display.html', schedule = schedule, ref = bus_stop.code, name = tr(bus_stop.name, 'bus-stops'), link_to_map = bus_stop.location.link_to_map, region = bus_stop.region, location = bus_stop.location, cluster = bus_stop.cluster.url_name if len(bus_stop.cluster.stops) > 1 else None, num_imminent_leaves = num_imminent_leaves, tr = tr, ) @app.route('/test') def test(): from buses import bus_stops bus_stop = bus_stops['16'] schedule = [{'imminent': True, 'index': 0, 'night': False, 'route': '2A', 'sign': {'destination': 'Kohmo', 'via': ['Nummenmäki', 'Kurala']}, 'time': '1m', 'trip': '00012501__3798generatedBlock'}, {'imminent': True, 'index': 1, 'night': False, 'route': '54', 'sign': {'destination': 'Ylioppilaskylä', 'via': []}, 'time': '2m', 'trip': '00014359__5656generatedBlock'}, {'imminent': True, 'index': 2, 'night': False, 'route': '1', 'sign': {'destination': 'Lentoasema ✈', 'via': ['Urusvuori']}, 'time': '3m', 'trip': '00010281__1281generatedBlock'}, {'imminent': False, 'index': 3, 'night': False, 'route': '56', 'sign': {'destination': 'Räntämäki', 'via': ['Nummenmäki', 'Halinen']}, 'time': '8m', 'trip': '00014686__5983generatedBlock'}, {'imminent': False, 'index': 4, 'night': False, 'route': '42', 'sign': {'destination': 'Varissuo', 'via': ['Kupittaa as', 'Itäharju']}, 'time': '18:30', 'trip': '00014010__5307generatedBlock'}, {'imminent': False, 'index': 5, 'night': False, 'route': '2B', 'sign': {'destination': 'Littoinen', 'via': ['Nummenmäki', 'Kurala', 'Kohmo']}, 'time': '18:35', 'trip': '00012629__3926generatedBlock'}] return render_template( 'stop_display.html', schedule = schedule, ref = bus_stop.code, name = tr(bus_stop.name, 'bus-stops'), link_to_map = bus_stop.location.link_to_map, region = bus_stop.region, location = bus_stop.location, cluster = bus_stop.cluster.url_name if len(bus_stop.cluster.stops) > 1 else None, num_imminent_leaves = max(1, sum(schedule_entry['imminent'] for schedule_entry in schedule)), tr = tr, ) def time_representation(time, relative = True): time_difference = time - now() if relative and timedelta(minutes = -1) < time_difference < timedelta(minutes = 1): return tr('right-now', 'misc-text') elif relative and time_difference > timedelta(0) and time_difference < timedelta(minutes = 10): return '%dm' % round(time_difference.seconds / 60) elif time.date() == today(): return '%d:%02d' % (time.hour, time.minute) elif time_difference < timedelta(7): with activate_locale(): return time.strftime('%-a %H:%M').replace(' ', '\xa0') else: with activate_locale(): return time.strftime('%-d.%-m. %H:%M').replace(' ', '\xa0') @app.route('/stop_cluster/<cluster_name>') def cluster_schedule(cluster_name): from buses import bus_stops, clusters_by_name schedule = [] try: cluster = clusters_by_name[cluster_name] except KeyError: abort(404) for schedule_entry in cluster.schedule(max_amount = 100): schedule.append({ 'time': time_representation(schedule_entry['time']), 'route': schedule_entry['trip'].route.reference, 'route-splice': split_route_ref(schedule_entry['trip'].route.reference), 'sign': sign(schedule_entry), 'trip': schedule_entry['stop'].trip.name, 'night': is_night_time(schedule_entry['time']), 'stop_id': schedule_entry['stop'].stop.reference, 'stop_code': schedule_entry['stop'].stop.code, 'stop_name': tr(schedule_entry['stop'].stop.name, 'bus-stops', 'places'), 'imminent': imminent(schedule_entry), }) stops_in_cluster = sorted( ({ 'id': stop.reference, 'code': stop.code, 'name': tr(stop.name, 'bus-stops'), } for stop in cluster.stops), key = lambda stop: (len(stop['id']), stop['id']) ) return render_template( 'cluster.html', schedule = schedule, name = tr(cluster.name, 'places', 'place-clusters', 'bus-stops'), link_to_map = cluster.center.link_to_map, location = cluster.center, stops_in_cluster = stops_in_cluster, amount_of_stops_in_cluster = len(stops_in_cluster), tr = tr, ) def day_class(weekday): if weekday < 5: return 'working-day' elif weekday == 5: return 'saturday' else: assert weekday == 6 return 'sunday' @app.route('/stop_week/<stop_reference>') def stop_week(stop_reference): from buses import bus_stops from flask import request if 'routes' in request.args: filtered_routes = set(request.args['routes'].split(';')) route_filter = lambda route: route in filtered_routes else: route_filter = lambda route: True if 'dest' in request.args: dests = {bus_stops.get(dest, None) for dest in request.args['dest'].split(';')} dests.discard(None) dest_filter = lambda trip: any(trip.contains_stop(dest) for dest in dests) else: dest_filter = lambda trip: True schedule = [] try: bus_stop = bus_stops[stop_reference] except KeyError: abort(404) week_model = {} for schedule_entry in week_schedule(bus_stop, arrivals = True): route_ref = schedule_entry['trip'].route.reference if route_filter(route_ref) and dest_filter(schedule_entry['trip']): time = schedule_entry['time'] date = schedule_entry['date'] if date not in week_model: week_model[date] = {} day_model = week_model[date] if time.hour not in day_model: day_model[time.hour] = [] hour_model = day_model[time.hour] hour_model.append({ 'route': route_ref, 'route-splice': split_route_ref(route_ref), 'trip': schedule_entry['stop'].trip.name, 'night': is_night_time(schedule_entry['time']), 'minute': time.minute, }) for day_offset in range(7): from datetime import date, datetime, timedelta day = date.today() + timedelta(day_offset) try: day_model = week_model[day] except KeyError: week_model[day] = {} else: def hour_key(x): return (x - 5) % 24 # Fill in missing hours from 5am to last active hour hours = set(day_model.keys()) | {5} sorted_hours = sorted(hours, key = hour_key) start_hour = sorted_hours[0] end_hour = sorted_hours[-1] + 1 for hour in range(start_hour, end_hour): hour_start = datetime(day.year, day.month, day.day, hour, 0) if hour not in day_model and hour_start >= datetime.now(): day_model[hour] = [] # Sort the hours, so that 5am is first and 4am is last. from collections import OrderedDict week_model[day] = OrderedDict( sorted( day_model.items(), key = lambda pair: hour_key(pair[0]), ) ) week_model = [ { 'day': day, 'schedule': schedule, 'day-class': day_class(day.weekday()) } for day, schedule in week_model.items() ] week_model = sorted(week_model, key = lambda day: day['day']) return render_template( 'stop_week.html', ref = bus_stop.code, name = tr(bus_stop.name, 'bus-stops'), tr = tr, week = week_model, ) @app.route('/trip/<trip_reference>') def trip(trip_reference): from flask import request from buses import all_trips from busroute import simplify_name try: trip = all_trips[trip_reference] except KeyError: abort(404) schedule = [] region = '' for halt in trip.schedule: stop_time = datetime.combine(today(), time()) + halt.arrival_time formatted_time = time_representation(stop_time) if profile['regions']['use-regions']: if halt.stop.region != region and not (region and not halt.stop.region): if len(schedule) and not schedule[-1]['name']: schedule[-1]['name'] = tr(halt.stop.region or '', 'places') else: schedule.append({ 'name': tr(halt.stop.region or '', 'places'), 'time': formatted_time, 'stops': [], 'index': len(schedule), }) region = halt.stop.region else: schedule.append({ 'name': tr(halt.stop.name or '', 'bus-stops', 'places'), 'time': formatted_time, 'stops': [], 'index': len(schedule), }) schedule[-1]['stops'].append({ 'time': formatted_time, 'id': halt.stop.reference, 'code': halt.stop.code, 'name': tr(halt.stop.name, 'bus-stops', 'places'), }) sign = trip.concise_schedule() try: sign = [simplify_name(sign[0]), simplify_name(sign[-1])] except IndexError: sign = [trip.schedule[0].stop.name, trip.schedule[-1].stop.name] return render_template('trip.html', schedule = schedule, trip_reference = trip_reference, route = trip.route.reference, description = ' - '.join(tr(place, 'places') for place in sign), night = is_night_time(datetime.combine(today(), time()) + trip.schedule[-1].arrival_time), tr = tr, length = trip.length / 1000, ) @app.route('/route/<name>') def route_page(name): from buses import routes route = routes[name.upper()] schedule = [] for trip in route.trips: if trip.is_served_at(today()) and datetime.combine(today(), time()) + trip.schedule[-1].arrival_time < now(): schedule.append({ 'name': trip.reference, 'from': trip.from_place, 'to': trip.to_place, 'time': time_representation(datetime.combine(today(), time()) + trip.schedule[0].departure_time), }) return render_template('route.html', name = route.reference + ' ' + route.description, tr = tr, schedule = schedule, ) @app.route('/') def index(): return redirect('stop_cluster/kauppatori') @app.route('/pysäkki/<reference>') def redirect_pysäkki(reference): return redirect('stop/' + str(reference)) @app.route('/pysäkkiryhmä/<reference>') def redirect_pysäkkiryhmä(reference): return redirect('stop_cluster/' + str(reference)) @app.route('/ajovuoro/<reference>') def redirect_ajovuoro(reference): return redirect('trip/' + str(reference)) @app.route('/static/<path:path>') def static_file(path): return send_from_directory(path.join('static', path)) from argparse import ArgumentParser parser = ArgumentParser() parser.add_argument('gtfs_zip_path') parser.add_argument('profile_path') if __name__ == '__main__': parser.add_argument('-p', '--port', type = int, default = 5000) parser.add_argument('-d', '--debug', action = 'store_true') args = parser.parse_args() profile.read(args.profile_path) buses.load_buses(args.gtfs_zip_path) if __name__ == '__main__': app.run(debug = args.debug, port = args.port)