dublinbus-display/gtfs_client.py

386 lines
16 KiB
Python

from arrival_times import ArrivalTime
import datetime
import gc
import gtfs_kit as gk
import json
import os
import pandas as pd
import queue
import refresh_feed
import requests
import time
import threading
import traceback
import zipfile
class GTFSClient():
def __init__(self, feed_url: str, gtfs_r_url: str, gtfs_r_api_key: str,
stop_codes: list[str], update_queue: queue.Queue, update_interval_seconds: int = 60):
self.stop_codes = stop_codes
feed_name = feed_url.split('/')[-1]
self.gtfs_r_url = gtfs_r_url
self.gtfs_r_api_key = gtfs_r_api_key
# Make sure that the feed file is up to date
try:
last_mtime = os.stat(feed_name).st_mtime
except:
last_mtime = 0
refreshed, new_mtime = refresh_feed.update_local_file_from_url_v1(last_mtime, feed_name, feed_url)
if refreshed:
print("The feed file was refreshed.")
else:
print("The feed file was up to date")
# Load the feed
self.feed = self._read_feed(feed_name, dist_units='km', stop_codes = stop_codes)
gc.collect()
self.stop_ids = self.__wanted_stop_ids()
self.deltas = {}
self.canceled_trips = set()
# Schedule refresh
self._update_queue = update_queue
if update_interval_seconds and update_queue:
self._update_interval_seconds = update_interval_seconds
self._refresh_thread = threading.Thread(target=lambda: every(update_interval_seconds, self.refresh))
def _read_feed(self, path: gk.Path, dist_units: str, stop_codes: list[str]) -> gk.Feed:
"""
NOTE: This helper method was extracted from gtfs_kit.feed to modify it
to only load the stop_times for the stops we are interested in,
because loading the entire feed would use more memory than the Raspberry Pi Zero W has.
This version also reads CSV data straight from the zip file to avoid
wearing out the Pi's SD card.
"""
FILES_TO_LOAD = [
# List of feed files to load. stop_times.txt is loaded separately.
'trips.txt',
'routes.txt',
'calendar.txt',
'calendar_dates.txt',
'stops.txt',
'agency.txt'
]
path = gk.Path(path)
if not path.exists():
raise ValueError(f"Path {path} does not exist")
gc.collect()
feed_dict = {table: None for table in gk.cs.GTFS_REF["table"]}
with zipfile.ZipFile(path) as z:
for filename in FILES_TO_LOAD:
table = filename.split(".")[0]
# read the file
with z.open(filename) as f:
df = pd.read_csv(f, dtype=gk.cs.DTYPE, encoding="utf-8-sig")
if not df.empty:
feed_dict[table] = gk.cn.clean_column_names(df)
gc.collect()
# Finally, load stop_times.txt
# Obtain the list of IDs of the desired stops. This is similar to what __wanted_stop_ids() does,
# but without a dependency on a fully formed feed object
wanted_stop_ids = feed_dict.get("stops")[feed_dict.get("stops")["stop_code"].isin(self.stop_codes)]["stop_id"]
with z.open("stop_times.txt") as f:
iter_csv = pd.read_csv(f, iterator=True, chunksize=1000, dtype=gk.cs.DTYPE, encoding="utf-8-sig")
df = pd.concat([chunk[chunk["stop_id"].isin(wanted_stop_ids)] for chunk in iter_csv])
gc.collect()
if not df.empty:
# Fix arrival and departure times so that comparisons work the way they are expected to
df["arrival_time"] = df.apply(lambda row: row["arrival_time"] if len(row["arrival_time"]) == 8 else "0"+row["arrival_time"], axis=1)
gc.collect()
df["departure_time"] = df.apply(lambda row: row["departure_time"] if len(row["departure_time"]) == 8 else "0"+row["departure_time"], axis=1)
gc.collect()
feed_dict["stop_times"] = gk.cn.clean_column_names(df)
gc.collect()
feed_dict["dist_units"] = dist_units
# Create feed
return gk.Feed(**feed_dict)
def __wanted_stop_ids(self) -> pd.core.frame.DataFrame:
"""
Return a DataFrame with the ID and names of the chosen stop(s) as requested in station_names
"""
stops = self.feed.stops[self.feed.stops["stop_code"].isin(self.stop_codes)]
if stops.empty:
raise Exception("Stops is empty!")
return stops["stop_id"]
def __service_ids_active_at(self, when: datetime) -> pd.core.frame.DataFrame:
"""
Returns the service IDs active at a particular point in time
"""
todays_date = when.strftime("%Y%m%d")
todays_weekday = when.strftime("%A").lower()
active_calendars = self.feed.calendar.query('start_date <= @todays_date and end_date >= @todays_date and {} == 1'.format(todays_weekday))
return active_calendars
def __current_calendars(self) -> pd.core.frame.DataFrame:
"""
Filter the calendar entries to find all services that apply for today.
Returns an empty list if none do.
"""
# Take the service IDs active today
now = datetime.datetime.now()
now_active = self.__service_ids_active_at(now)
if now_active.empty:
print("There are no service IDs for today!")
# Merge with the service IDs for tomorrow (in case the number of trips spills over to tomorrow)
tomorrow = datetime.datetime.now() + datetime.timedelta(days=1)
tomorrow_active = self.__service_ids_active_at(tomorrow)
if tomorrow_active.empty:
print("There are no service IDs for tomorrow!")
#active_calendars = pd.concat([now_active, tomorrow_active])
active_calendars = now_active
if active_calendars.empty:
print("The concatenation of today and tomorrow's calendars is empty. This should not happen.")
return active_calendars
def __current_service_ids(self) -> pd.core.series.Series:
"""
Filter the calendar entries to find all service ids that apply for today.
Returns an empty list if none do.
"""
return self.__current_calendars()["service_id"]
def __trip_ids_for_service_ids(self, service_ids: pd.core.series.Series) -> pd.core.series.Series:
"""
Returns a dataframe with the trip IDs for the given service IDs
"""
trips = self.feed.trips[self.feed.trips["service_id"].isin(service_ids)]
if trips.empty:
print("There are no active trips!")
return trips["trip_id"]
def __next_n_buses(self,
trip_ids: pd.core.series.Series,
n: int) -> pd.core.frame.DataFrame:
now = datetime.datetime.now()
current_time = now.strftime("%H:%M:%S")
next_stops = self.feed.stop_times[self.feed.stop_times["stop_id"].isin(self.stop_ids)
& self.feed.stop_times["trip_id"].isin(trip_ids)
& (self.feed.stop_times["arrival_time"] > current_time)]
next_stops = next_stops.sort_values("arrival_time")
return next_stops[:n][["trip_id", "arrival_time", "stop_id"]]
def __join_data(self, next_buses: pd.core.frame.DataFrame) -> pd.core.frame.DataFrame:
"""
Enriches the stop data with the information from other dataframes in the feed
"""
joined_data = (next_buses
.join(self.feed.trips.set_index("trip_id"), on="trip_id")
.join(self.feed.stops.set_index("stop_id"), on="stop_id")
.join(self.feed.routes.set_index("route_id"), on="route_id"))
return joined_data
def __time_to_seconds(self, s: str) -> int:
sx = s.split(":")
if len(sx) != 3:
print("Malformed timestamp:", s)
return 0
return int(sx[0]) * 3600 + int(sx[1]) * 60 + int (sx[2])
def __due_in_seconds(self, time_str: str) -> int:
"""
Returns the number of seconds in the future that the time_str (format hh:mm:ss) is
"""
now = datetime.datetime.now().strftime("%H:%M:%S")
tnow = self.__time_to_seconds(now)
tstop = self.__time_to_seconds(time_str)
if tstop > tnow:
return tstop - tnow
else:
# If the stop time is less than the current time, the stop is tomorrow
return tstop + 86400 - tnow
def __lookup_headsign_by_route(self, route_id: str, direction_id: int) -> str:
"""
Look up a destination string in Trips from the route and direction
"""
trips = self.feed.trips
return trips[(trips["route_id"] == route_id) & (trips["direction_id"] == direction_id)].head(1)["trip_headsign"].item()
def __poll_gtfsr_deltas(self) -> list[map, set]:
# Poll GTFS-R API
if self.gtfs_r_api_key != "":
headers = {"x-api-key": self.gtfs_r_api_key}
response = requests.get(url = self.gtfs_r_url, headers = headers)
if response.status_code != 200:
print("GTFS-R sent non-OK response: {}\n{}".format(response.status_code, response.text))
return ({}, [], [])
deltas_json = json.loads(response.text)
else:
deltas_json = json.load(open("example.json"))
deltas = {}
canceled_trips = set()
added_stops = []
# Pre-compute some data to use for added trips:
relevant_service_ids = self.__current_service_ids()
relevant_trips = self.feed.trips[self.feed.trips["service_id"].isin(relevant_service_ids)]
relevant_route_ids = set(relevant_trips["route_id"])
today = datetime.date.today().strftime("%Y%m%d")
for e in deltas_json.get("entity", []):
is_deleted = e.get("is_deleted") or False
try:
trip_update = e.get("trip_update")
trip = trip_update.get("trip")
trip_id = trip.get("trip_id")
trip_action = trip.get("schedule_relationship")
if trip_action == "SCHEDULED":
for u in e.get("trip_update", {}).get("stop_time_update", []):
delay = u.get("arrival", u.get("departure", {})).get("delay", 0)
deltas_for_trip = (deltas.get(trip_id) or {})
deltas_for_trip[u.get("stop_id")] = delay
deltas[trip_id] = deltas_for_trip
elif trip_action == "ADDED":
start_date = trip.get("start_date")
start_time = trip.get("start_time")
route_id = trip.get("route_id")
direction_id = trip.get("direction_id")
# Check if the route is part of the routes we care about
if not route_id in relevant_route_ids:
continue
# And that it's for today
current_time = datetime.datetime.now().strftime("%H:%M:%S")
if start_date > today or start_time > current_time:
continue
# Look for the entry for any of the stops we want
wanted_stop_ids = self.__wanted_stop_ids()
for stop_time_update in e.get("trip_update").get("stop_time_update", []):
if stop_time_update.get("stop_id", "") in wanted_stop_ids:
arrival_time = int((stop_time_update.get("arrival", stop_time_update.get("departure", {})).get("time", 0)))
if arrival_time < int(time.time()):
continue
new_arrival = ArrivalTime(
stop_id = stop_time_update.get("stop_id"),
route_id = self.feed.routes[self.feed.routes["route_id"] == route_id]["route_short_name"].item(),
destination = self.__lookup_headsign_by_route(route_id, direction_id),
due_in_seconds = arrival_time - int(time.time()),
is_added = True
)
print("Added route:", new_arrival)
added_stops.append(new_arrival)
elif trip_action == "CANCELED":
canceled_trips.add(trip_id)
else:
print("Unsupported action:", trip_action)
except Exception as x:
print("Error parsing GTFS-R entry:", str(e))
raise(x)
return deltas, canceled_trips, added_stops
def get_next_n_buses(self, num_entries: int) -> pd.core.frame.DataFrame:
"""
Returns a dataframe with the information of the next N buses arriving at the requested stops.
"""
service_ids = self.__current_service_ids()
trip_ids = self.__trip_ids_for_service_ids(service_ids)
next_buses = self.__next_n_buses(trip_ids, num_entries)
joined_data = self.__join_data(next_buses)
return joined_data
def start(self) -> None:
""" Start the refresh thread """
self._refresh_thread.start()
self.refresh()
def refresh(self):
"""
Create and enqueue the refreshed stop data
"""
# Retrieve the GTFS-R deltas
deltas, canceled_trips, added_stops = self.__poll_gtfsr_deltas()
if len(deltas) > 0 or len(canceled_trips) > 0 or len(added_stops) > 0:
# Only update deltas and canceled trips if the API returns data
self.deltas = deltas
self.canceled_trips = canceled_trips
self.added_stops = added_stops
arrivals = []
# take more entries than we need in case there are cancelations
buses = self.get_next_n_buses(10)
for index, bus in buses.iterrows():
if not bus["trip_id"] in self.canceled_trips:
delta = self.deltas.get(bus["trip_id"], {}).get(bus["stop_id"], 0)
if delta != 0:
print("Delta for route {} stop {} is {}".format(bus["route_short_name"], bus["stop_id"], delta))
arrival = ArrivalTime(stop_id = bus["stop_id"],
route_id = bus["route_short_name"],
destination = bus["trip_headsign"],
due_in_seconds = self.__due_in_seconds(bus["arrival_time"]) + delta,
is_added = False
)
arrivals.append(arrival)
if len(self.added_stops) > 0:
# Append the added stops from GTFS-R and re-sort
arrivals.extend(self.added_stops)
arrivals.sort()
# Select the first 5 of what remains
arrivals = arrivals[0:5]
if self._update_queue:
self._update_queue.put(arrivals)
gc.collect()
return arrivals
def every(delay, task) -> None:
""" Auxilliary function to schedule updates.
Taken from https://stackoverflow.com/questions/474528/what-is-the-best-way-to-repeatedly-execute-a-function-every-x-seconds
"""
next_time = time.time() + delay
while True:
time.sleep(max(0, next_time - time.time()))
try:
task()
except Exception:
traceback.print_exc()
# in production code you might want to have this instead of course:
# logger.exception("Problem while executing repetitive task.")
# skip tasks if we are behind schedule:
next_time += (time.time() - next_time) // delay * delay + delay