bussit.py

Mon, 17 Apr 2017 14:51:22 +0300

author
Teemu Piippo <teemu@hecknology.net>
date
Mon, 17 Apr 2017 14:51:22 +0300
changeset 0
fc48613c73e5
child 2
48efa8ca14dd
permissions
-rwxr-xr-x

Alustava versio

#!/usr/bin/env python3
import enum
from datetime import date, time, datetime, timedelta
Suunta = enum.Enum('Suunta', [('Taaksepäin', 0), ('Eteenpäin', 1)])

def lue_csv(tiedosto, muunnokset = None):
	import csv
	lukija = csv.reader(tiedosto)
	otsakkeet = next(lukija)
	for rivi in lukija:
		tietue = dict(zip(otsakkeet, rivi))
		if muunnokset:
			for avain, muunnos in muunnokset.items():
				tietue[avain] = muunnos(tietue[avain])
		yield tietue

def muunna_ajovuoro_tunniste(tunniste):
	return int(tunniste.split('_')[0])

class Ajovuoro:
	def __init__(self, tunniste, linja, palvelu, kyltti, suunta):
		self.tunniste, self.linja, self.palvelu, self.kyltti, self.suunta = tunniste, linja, \
			palvelu, kyltti, suunta
		self.reitti = []
		self.nimi = muunna_ajovuoro_tunniste(tunniste)
	def __repr__(self):
		return 'ajot[%r]' % self.nimi
	def pysäkkiReitillä(self, pysäkki):
		for pysähdys in self.reitti:
			if pysähdys.pysäkki is pysäkki:
				return pysähdys
		else:
			return None
	def ajetaan_päivänä(self, päivä):
		try:
			return self.palvelu in palvelut_per_päivä[päivä]
		except KeyError:
			return False

class Linja:
	def __init__(self, tietue):
		self.tunniste = tietue['route_id']
		self.viite = tietue['route_short_name']
		self.selite = tietue['route_long_name']
	def __repr__(self):
		return 'linjat[%r]' % self.viite

class Palvelu:
	def __init__(self, tunniste):
		self.tunniste = tunniste
		self.päivät = set()
	def __repr__(self):
		return 'palvelut[%r]' % self.tunniste

class Pysäkki:
	def __init__(self, tunniste, nimi):
		self.tunniste, self.nimi = tunniste, nimi
	def __repr__(self):
		return 'pysäkit[%r]' % self.tunniste
	def aikataulu(self, määrä = 50):
		'''
			Hakee tämän pysäkin seuraavat `määrä` lähtöä. Päätepysäkille saapuvia busseja ei
			lasketa. Palauttaa pysähdykset listana jossa alkiot ovat muotoa (aika, pysähdys),
			jossa:
			- `aika` on saapumishetki muotoa datetime ja
			- `pysähdys` on vastaava Pysähdys olio.
			
			Mikäli pysäkille ei ole määrätty riittävästi pysähdyksiä kalenterissa, tuloslista
			jää alimittaiseksi, mahdollisesti jopa tyhjäksi.
		'''
		class PäivätLoppuError(Exception):
			pass
		# Hakee pysäkin aikataulut tiettynä päivänä.
		def aikataulu_päivänä(päivä):
			# Jos päädyttiin aikataulukalenterin ulkopuolelle, niin tuotetaan virhe. Jos vain
			# palautettaisiin tyhjä tulos, niin algoritmi jatkaisi etsintää loputtomiin.
			if päivä > viimeinen_käyttöpäivä:
				raise PäivätLoppuError()
			taulu = []
			# Jokaiselle ajovuorolle,
			for ajo in ajot.values():
				# jos tämä ajovuoro ajetaan tänä päivänä
				if ajo.ajetaan_päivänä(päivä):
					# ja jos tämä ajo pysähtyy tällä pysäkillä, ei kuitenkaan saapuen
					# päätepysäkille,
					pysähdys = ajo.pysäkkiReitillä(self)
					if pysähdys and pysähdys is not ajo.reitti[-1]:
						# ja jos tämä pysähdys on tulevaisuudessa,
						aika = datetime.combine(päivä, time()) + pysähdys.saapumisaika
						if aika >= datetime.now():
							# lisää pysähdys listaan.
							taulu.append((aika, pysähdys))
			# Lajittele lopputulos saapumisajan mukaan.
			taulu.sort(key = lambda tietue: tietue[0])
			return taulu
		taulu = []
		päivä = date.today()
		# Niin kauan kuin aikatauluja ei ole vielä tarpeeksi,
		while len(taulu) < määrä:
			try:
				# hae nykyisen päivän aikataulut ja lisää ne,
				taulu += aikataulu_päivänä(päivä)
			except PäivätLoppuError:
				# paitsi jos mentiin kalenterin ulkopuolelle, jolloin lopetetaan,
				break
			# ja siirry seuraavaan päivään.
			päivä += timedelta(1)
		# Typistä lopputulos haluttuun tulosmäärään.
		return taulu[:määrä]

class Pysähdys:
	def __init__(self, saapumisaika, lähtöaika, pysäkki, ajo):
		self.saapumisaika, self.lähtöaika, self.pysäkki, self.ajo = saapumisaika, lähtöaika, \
			pysäkki, ajo
	def __repr__(self):
		return 'Pysähdys(%r, %r, %r, %r)' % (self.saapumisaika, self.lähtöaika, self.pysäkki, self.ajo)

linjat = {}
linjat_per_tunniste = {}
ajot = {}
ajot_per_numero = {}
palvelut = {}
pysäkit = {}

print('Ladataan linjat... ', end = '', flush = True)
with open('gtfs/routes.txt') as tiedosto:
	for rivi in lue_csv(tiedosto):
		linja = Linja(rivi)
		linja.tunniste = linja.tunniste
		linjat[linja.viite] = linja
		linjat_per_tunniste[linja.tunniste] = linja
print('%d linjaa' % len(linjat))

print('Ladataan ajot... ', end = '', flush = True)
with open('gtfs/trips.txt') as tiedosto:
	for rivi in lue_csv(tiedosto, muunnokset = {'direction_id': lambda k: Suunta(int(k))}):
		if rivi['service_id'] not in palvelut:
			palvelut[rivi['service_id']] = Palvelu(rivi['service_id'])
		linja = linjat_per_tunniste[rivi['route_id']]
		ajo = Ajovuoro(tunniste = rivi['trip_id'],
			linja = linja,
			palvelu = palvelut[rivi['service_id']],
			kyltti = rivi['trip_headsign'],
			suunta = rivi['direction_id'])
		assert ajo.nimi not in ajot
		ajot[ajo.nimi] = ajo
print('%d ajoa' % len(ajot))

def lue_päiväys(teksti):
	return date(int(teksti[:4]), int(teksti[4:6]), int(teksti[6:]))

def lue_aika(teksti):
	tunti, minuutti, sekunti = map(int, teksti.split(':'))
	return timedelta(hours = tunti, minutes = minuutti, seconds = sekunti)

print('Ladataan päiväykset... ', flush = True)

viimeinen_käyttöpäivä = date.today()
palvelut_per_päivä = {}

with open('gtfs/calendar_dates.txt') as tiedosto:
	for rivi in lue_csv(tiedosto):
		palvelu = palvelut[rivi['service_id']]
		päivä = lue_päiväys(rivi['date'])
		palvelu.päivät.add(päivä)
		if päivä not in palvelut_per_päivä:
			palvelut_per_päivä[päivä] = set()
		palvelut_per_päivä[päivä].add(palvelu)
		viimeinen_käyttöpäivä = max(päivä, viimeinen_käyttöpäivä)

def palvelut_käytössä(päivä):
	for palvelu in palvelut.values():
		if päivä in palvelu.päivät:
			yield palvelu

print('Ladataan pysäkit... ', end = '', flush = True)
with open('gtfs/stops.txt') as file:
	for rivi in lue_csv(file):
		pysäkki = Pysäkki(rivi['stop_id'], rivi['stop_name'])
		pysäkit[pysäkki.tunniste] = pysäkki
print('%d pysäkkiä' % len(pysäkit))

print('Ladataan aikataulut... ', end = '', flush = True)
with open('gtfs/stop_times.txt') as file:
	rivimäärä = sum(line.count('\n') for line in file)
	laskettu = 0
	file.seek(0)
	for rivi in lue_csv(file):
		ajo = ajot[muunna_ajovuoro_tunniste(rivi['trip_id'])]
		saapumisaika = lue_aika(rivi['arrival_time'])
		lähtöaika = lue_aika(rivi['departure_time'])
		pysäkki = pysäkit[rivi['stop_id']]
		ajo.reitti.append(Pysähdys(saapumisaika, lähtöaika, pysäkki, ajo))
		laskettu += 1
		if laskettu % 1000 == 0:
			print('\rLadataan aikataulut... %.1f%%' % (laskettu * 100 / rivimäärä), end = ' ')
print('\rLadataan aikataulut... ladattu')

mercurial