dublinbus-display/gtfs_client.py

407 lines
17 KiB
Python

from arrival_times import ArrivalTime
import datetime
import gc
import gtfs_kit as gk
import json
import os
import pandas as pd
import queue
import refresh_feed
import requests
import time
import threading
import traceback
import zipfile
class GTFSClient():
def __init__(self, feed_url: str, gtfs_r_url: str, gtfs_r_api_key: str,
stop_codes: list[str], routes_for_stops: dict[str, str],
update_queue: queue.Queue, update_interval_seconds: int = 60):
self.stop_codes = stop_codes
self.routes_for_stops = routes_for_stops
feed_name = feed_url.split('/')[-1]
self.gtfs_r_url = gtfs_r_url
self.gtfs_r_api_key = gtfs_r_api_key
# Make sure that the feed file is up to date
try:
last_mtime = os.stat(feed_name).st_mtime
except:
last_mtime = 0
refreshed, new_mtime = refresh_feed.update_local_file_from_url_v1(last_mtime, feed_name, feed_url)
# Load the feed
self.feed = self._read_feed(feed_name, dist_units='km', stop_codes = stop_codes)
gc.collect()
self.stop_ids = self.__wanted_stop_ids()
self.deltas = {}
self.canceled_trips = set()
self.added_stops = []
# Schedule refresh
self._update_queue = update_queue
if update_interval_seconds and update_queue:
self._update_interval_seconds = update_interval_seconds
self._refresh_thread = threading.Thread(target=lambda: every(update_interval_seconds, self.refresh))
def _read_feed(self, path: gk.Path, dist_units: str, stop_codes: list[str]) -> gk.Feed:
"""
NOTE: This helper method was extracted from gtfs_kit.feed to modify it
to only load the stop_times for the stops we are interested in,
because loading the entire feed would use more memory than the Raspberry Pi Zero W has.
This version also reads CSV data straight from the zip file to avoid
wearing out the Pi's SD card.
"""
FILES_TO_LOAD = [
# List of feed files to load. stop_times.txt is loaded separately.
'trips.txt',
'routes.txt',
'calendar.txt',
'calendar_dates.txt',
'stops.txt',
'agency.txt'
]
path = gk.Path(path)
if not path.exists():
raise ValueError(f"Path {path} does not exist")
gc.collect()
feed_dict = {table: None for table in gk.cs.GTFS_REF["table"]}
with zipfile.ZipFile(path) as z:
for filename in FILES_TO_LOAD:
table = filename.split(".")[0]
# read the file
with z.open(filename) as f:
df = pd.read_csv(f, dtype=gk.cs.DTYPE, encoding="utf-8-sig")
if not df.empty:
feed_dict[table] = gk.cn.clean_column_names(df)
gc.collect()
# Finally, load stop_times.txt
# Obtain the list of IDs of the desired stops. This is similar to what __wanted_stop_ids() does,
# but without a dependency on a fully formed feed object
wanted_stop_ids = feed_dict.get("stops")[feed_dict.get("stops")["stop_code"].isin(self.stop_codes)]["stop_id"]
with z.open("stop_times.txt") as f:
iter_csv = pd.read_csv(f, iterator=True, chunksize=1000, dtype=gk.cs.DTYPE, encoding="utf-8-sig")
df = pd.concat([chunk[chunk["stop_id"].isin(wanted_stop_ids)] for chunk in iter_csv])
gc.collect()
if not df.empty:
# Fix arrival and departure times so that comparisons work the way they are expected to
df["arrival_time"] = df.apply(lambda row: row["arrival_time"] if len(row["arrival_time"]) == 8 else "0"+row["arrival_time"], axis=1)
gc.collect()
df["departure_time"] = df.apply(lambda row: row["departure_time"] if len(row["departure_time"]) == 8 else "0"+row["departure_time"], axis=1)
gc.collect()
feed_dict["stop_times"] = gk.cn.clean_column_names(df)
gc.collect()
feed_dict["dist_units"] = dist_units
# Create feed
return gk.Feed(**feed_dict)
def __wanted_stop_ids(self) -> pd.core.frame.DataFrame:
"""
Return a DataFrame with the ID and names of the chosen stop(s) as requested in station_names
"""
stops = self.feed.stops[self.feed.stops["stop_code"].isin(self.stop_codes)]
if stops.empty:
raise Exception("Stops is empty!")
return stops["stop_id"]
def __service_ids_active_at(self, when: datetime) -> pd.core.frame.DataFrame:
"""
Returns the service IDs active at a particular point in time
"""
todays_date = when.strftime("%Y%m%d")
todays_weekday = when.strftime("%A").lower()
active_calendars = self.feed.calendar.query('start_date <= @todays_date and end_date >= @todays_date and {} == 1'.format(todays_weekday))
return active_calendars
def __current_calendars(self) -> pd.core.frame.DataFrame:
"""
Filter the calendar entries to find all services that apply for today.
Returns an empty list if none do.
"""
# Take the service IDs active today
now = datetime.datetime.now()
now_active = self.__service_ids_active_at(now)
if now_active.empty:
print("There are no service IDs for today!")
# Merge with the service IDs for tomorrow (in case the number of trips spills over to tomorrow)
tomorrow = datetime.datetime.now() + datetime.timedelta(days=1)
tomorrow_active = self.__service_ids_active_at(tomorrow)
if tomorrow_active.empty:
print("There are no service IDs for tomorrow!")
#active_calendars = pd.concat([now_active, tomorrow_active])
active_calendars = now_active
if active_calendars.empty:
print("The concatenation of today and tomorrow's calendars is empty. This should not happen.")
return active_calendars
def __current_service_ids(self) -> pd.core.series.Series:
"""
Filter the calendar entries to find all service ids that apply for today.
Returns an empty list if none do.
"""
return self.__current_calendars()["service_id"]
def __trip_ids_for_service_ids(self, service_ids: pd.core.series.Series) -> pd.core.series.Series:
"""
Returns a dataframe with the trip IDs for the given service IDs
"""
trips = self.feed.trips[self.feed.trips["service_id"].isin(service_ids)]
if trips.empty:
print("There are no active trips!")
return trips["trip_id"]
def __next_n_buses(self,
trip_ids: pd.core.series.Series,
n: int) -> pd.core.frame.DataFrame:
now = datetime.datetime.now()
current_time = now.strftime("%H:%M:%S")
next_stops = self.feed.stop_times[self.feed.stop_times["stop_id"].isin(self.stop_ids)
& self.feed.stop_times["trip_id"].isin(trip_ids)
& (self.feed.stop_times["arrival_time"] > current_time)]
next_stops = next_stops.sort_values("arrival_time")
return next_stops[:n][["trip_id", "arrival_time", "stop_id"]]
def __join_data(self, next_buses: pd.core.frame.DataFrame) -> pd.core.frame.DataFrame:
"""
Enriches the stop data with the information from other dataframes in the feed
"""
joined_data = (next_buses
.join(self.feed.trips.set_index("trip_id"), on="trip_id")
.join(self.feed.stops.set_index("stop_id"), on="stop_id")
.join(self.feed.routes.set_index("route_id"), on="route_id"))
return joined_data
def __filter_routes_by_stops(self, next_buses: pd.core.frame.DataFrame) -> pd.core.frame.DataFrame:
"""
Takes a dataframe of a set of bus arrivals and only shows the routes we are interested in
for the given stops (this is to eliminate routes that stop in more than one of our stops)
"""
ids_to_delete = []
for index, next_bus in next_buses.iterrows():
stop_number = next_bus["stop_code"]
route = next_bus["route_short_name"]
routes_for_stop = self.routes_for_stops.get(int(stop_number), [])
if len(routes_for_stop) > 0 and not route in routes_for_stop:
# we should not show this entry. Note the ID
ids_to_delete.append(index)
next_buses.drop(index=ids_to_delete, inplace=True)
return next_buses
def __time_to_seconds(self, s: str) -> int:
sx = s.split(":")
if len(sx) != 3:
print("Malformed timestamp:", s)
return 0
return int(sx[0]) * 3600 + int(sx[1]) * 60 + int (sx[2])
def __due_in_seconds(self, time_str: str) -> int:
"""
Returns the number of seconds in the future that the time_str (format hh:mm:ss) is
"""
now = datetime.datetime.now().strftime("%H:%M:%S")
tnow = self.__time_to_seconds(now)
tstop = self.__time_to_seconds(time_str)
if tstop > tnow:
return tstop - tnow
else:
# If the stop time is less than the current time, the stop is tomorrow
return tstop + 86400 - tnow
def __lookup_headsign_by_route(self, route_id: str, direction_id: int) -> str:
"""
Look up a destination string in Trips from the route and direction
"""
trips = self.feed.trips
return trips[(trips["route_id"] == route_id) & (trips["direction_id"] == direction_id)].head(1)["trip_headsign"].item()
def __poll_gtfsr_deltas(self) -> list[map, set]:
# Poll GTFS-R API
if self.gtfs_r_api_key != "":
headers = {"x-api-key": self.gtfs_r_api_key}
response = requests.get(url = self.gtfs_r_url, headers = headers)
if response.status_code != 200:
print("GTFS-R sent non-OK response: {}\n{}".format(response.status_code, response.text))
return ({}, [], [])
deltas_json = json.loads(response.text)
else:
deltas_json = json.load(open("example.json"))
deltas = {}
canceled_trips = set()
added_stops = []
# Pre-compute some data to use for added trips:
relevant_service_ids = self.__current_service_ids()
relevant_trips = self.feed.trips[self.feed.trips["service_id"].isin(relevant_service_ids)]
relevant_route_ids = set(relevant_trips["route_id"])
today = datetime.date.today().strftime("%Y%m%d")
for e in deltas_json.get("entity", []):
is_deleted = e.get("is_deleted") or False
try:
trip_update = e.get("trip_update")
trip = trip_update.get("trip")
trip_id = trip.get("trip_id")
trip_action = trip.get("schedule_relationship")
if trip_action == "SCHEDULED":
for u in e.get("trip_update", {}).get("stop_time_update", []):
delay = u.get("arrival", u.get("departure", {})).get("delay", 0)
deltas_for_trip = (deltas.get(trip_id) or {})
deltas_for_trip[u.get("stop_id")] = delay
deltas[trip_id] = deltas_for_trip
elif trip_action == "ADDED":
start_date = trip.get("start_date")
start_time = trip.get("start_time")
route_id = trip.get("route_id")
direction_id = trip.get("direction_id")
# Check if the route is part of the routes we care about
if not route_id in relevant_route_ids:
continue
# And that it's for today
current_time = datetime.datetime.now().strftime("%H:%M:%S")
if start_date > today or start_time > current_time:
continue
# Look for the entry for any of the stops we want
wanted_stop_ids = self.__wanted_stop_ids()
for stop_time_update in e.get("trip_update").get("stop_time_update", []):
if stop_time_update.get("stop_id", "") in wanted_stop_ids:
arrival_time = int((stop_time_update.get("arrival", stop_time_update.get("departure", {})).get("time", 0)))
if arrival_time < int(time.time()):
continue
new_arrival = ArrivalTime(
stop_id = stop_time_update.get("stop_id"),
route_id = self.feed.routes[self.feed.routes["route_id"] == route_id]["route_short_name"].item(),
destination = self.__lookup_headsign_by_route(route_id, direction_id),
due_in_seconds = arrival_time - int(time.time()),
is_added = True
)
print("Added route:", new_arrival)
added_stops.append(new_arrival)
elif trip_action == "CANCELED":
canceled_trips.add(trip_id)
else:
print("Unsupported action:", trip_action)
except Exception as x:
print("Error parsing GTFS-R entry:", str(e))
raise(x)
return deltas, canceled_trips, added_stops
def get_next_n_buses(self, num_entries: int) -> pd.core.frame.DataFrame:
"""
Returns a dataframe with the information of the next N buses arriving at the requested stops.
"""
service_ids = self.__current_service_ids()
trip_ids = self.__trip_ids_for_service_ids(service_ids)
next_buses = self.__next_n_buses(trip_ids, num_entries)
joined_data = self.__join_data(next_buses)
self.__filter_routes_by_stops(joined_data)
return joined_data
def start(self) -> None:
""" Start the refresh thread """
self._refresh_thread.start()
self.refresh()
def refresh(self):
"""
Create and enqueue the refreshed stop data
"""
# Retrieve the GTFS-R deltas
deltas, canceled_trips, added_stops = self.__poll_gtfsr_deltas()
if len(deltas) > 0 or len(canceled_trips) > 0 or len(added_stops) > 0:
# Only update deltas and canceled trips if the API returns data
self.deltas = deltas
self.canceled_trips = canceled_trips
self.added_stops = added_stops
arrivals = []
# take more entries than we need in case there are cancelations
buses = self.get_next_n_buses(15)
for index, bus in buses.iterrows():
if not bus["trip_id"] in self.canceled_trips:
delta = self.deltas.get(bus["trip_id"], {}).get(bus["stop_id"], 0)
if delta != 0:
print("Delta for route {} stop {} is {}".format(bus["route_short_name"], bus["stop_id"], delta))
arrival = ArrivalTime(stop_id = bus["stop_id"],
route_id = bus["route_short_name"],
destination = bus["trip_headsign"],
due_in_seconds = self.__due_in_seconds(bus["arrival_time"]) + delta,
is_added = False
)
arrivals.append(arrival)
if len(self.added_stops) > 0:
# Append the added stops from GTFS-R and re-sort
arrivals.extend(self.added_stops)
arrivals.sort()
# Select the first 5 of what remains
arrivals = arrivals[0:5]
if self._update_queue:
self._update_queue.put(arrivals)
gc.collect()
return arrivals
def every(delay, task) -> None:
""" Auxilliary function to schedule updates.
Taken from https://stackoverflow.com/questions/474528/what-is-the-best-way-to-repeatedly-execute-a-function-every-x-seconds
"""
next_time = time.time() + delay
while True:
time.sleep(max(0, next_time - time.time()))
try:
task()
except Exception:
traceback.print_exc()
# in production code you might want to have this instead of course:
# logger.exception("Problem while executing repetitive task.")
# skip tasks if we are behind schedule:
next_time += (time.time() - next_time) // delay * delay + delay