from arrival_times import ArrivalTime import datetime import gc import gtfs_kit as gk import json import os import pandas as pd import queue import refresh_feed import requests import time import threading import traceback import zipfile class GTFSClient(): def __init__(self, feed_url: str, gtfs_r_url: str, gtfs_r_api_key: str, stop_codes: list[str], update_queue: queue.Queue, update_interval_seconds: int = 60): self.stop_codes = stop_codes feed_name = feed_url.split('/')[-1] self.gtfs_r_url = gtfs_r_url self.gtfs_r_api_key = gtfs_r_api_key # Make sure that the feed file is up to date try: last_mtime = os.stat(feed_name).st_mtime except: last_mtime = 0 refreshed, new_mtime = refresh_feed.update_local_file_from_url_v1(last_mtime, feed_name, feed_url) if refreshed: print("The feed file was refreshed.") else: print("The feed file was up to date") # Load the feed self.feed = self._read_feed(feed_name, dist_units='km', stop_codes = stop_codes) gc.collect() self.stop_ids = self.__wanted_stop_ids() self.deltas = {} self.canceled_trips = set() self.added_stops = [] # Schedule refresh self._update_queue = update_queue if update_interval_seconds and update_queue: self._update_interval_seconds = update_interval_seconds self._refresh_thread = threading.Thread(target=lambda: every(update_interval_seconds, self.refresh)) def _read_feed(self, path: gk.Path, dist_units: str, stop_codes: list[str]) -> gk.Feed: """ NOTE: This helper method was extracted from gtfs_kit.feed to modify it to only load the stop_times for the stops we are interested in, because loading the entire feed would use more memory than the Raspberry Pi Zero W has. This version also reads CSV data straight from the zip file to avoid wearing out the Pi's SD card. """ FILES_TO_LOAD = [ # List of feed files to load. stop_times.txt is loaded separately. 'trips.txt', 'routes.txt', 'calendar.txt', 'calendar_dates.txt', 'stops.txt', 'agency.txt' ] path = gk.Path(path) if not path.exists(): raise ValueError(f"Path {path} does not exist") gc.collect() feed_dict = {table: None for table in gk.cs.GTFS_REF["table"]} with zipfile.ZipFile(path) as z: for filename in FILES_TO_LOAD: table = filename.split(".")[0] # read the file with z.open(filename) as f: df = pd.read_csv(f, dtype=gk.cs.DTYPE, encoding="utf-8-sig") if not df.empty: feed_dict[table] = gk.cn.clean_column_names(df) gc.collect() # Finally, load stop_times.txt # Obtain the list of IDs of the desired stops. This is similar to what __wanted_stop_ids() does, # but without a dependency on a fully formed feed object wanted_stop_ids = feed_dict.get("stops")[feed_dict.get("stops")["stop_code"].isin(self.stop_codes)]["stop_id"] with z.open("stop_times.txt") as f: iter_csv = pd.read_csv(f, iterator=True, chunksize=1000, dtype=gk.cs.DTYPE, encoding="utf-8-sig") df = pd.concat([chunk[chunk["stop_id"].isin(wanted_stop_ids)] for chunk in iter_csv]) gc.collect() if not df.empty: # Fix arrival and departure times so that comparisons work the way they are expected to df["arrival_time"] = df.apply(lambda row: row["arrival_time"] if len(row["arrival_time"]) == 8 else "0"+row["arrival_time"], axis=1) gc.collect() df["departure_time"] = df.apply(lambda row: row["departure_time"] if len(row["departure_time"]) == 8 else "0"+row["departure_time"], axis=1) gc.collect() feed_dict["stop_times"] = gk.cn.clean_column_names(df) gc.collect() feed_dict["dist_units"] = dist_units # Create feed return gk.Feed(**feed_dict) def __wanted_stop_ids(self) -> pd.core.frame.DataFrame: """ Return a DataFrame with the ID and names of the chosen stop(s) as requested in station_names """ stops = self.feed.stops[self.feed.stops["stop_code"].isin(self.stop_codes)] if stops.empty: raise Exception("Stops is empty!") return stops["stop_id"] def __service_ids_active_at(self, when: datetime) -> pd.core.frame.DataFrame: """ Returns the service IDs active at a particular point in time """ todays_date = when.strftime("%Y%m%d") todays_weekday = when.strftime("%A").lower() active_calendars = self.feed.calendar.query('start_date <= @todays_date and end_date >= @todays_date and {} == 1'.format(todays_weekday)) return active_calendars def __current_calendars(self) -> pd.core.frame.DataFrame: """ Filter the calendar entries to find all services that apply for today. Returns an empty list if none do. """ # Take the service IDs active today now = datetime.datetime.now() now_active = self.__service_ids_active_at(now) if now_active.empty: print("There are no service IDs for today!") # Merge with the service IDs for tomorrow (in case the number of trips spills over to tomorrow) tomorrow = datetime.datetime.now() + datetime.timedelta(days=1) tomorrow_active = self.__service_ids_active_at(tomorrow) if tomorrow_active.empty: print("There are no service IDs for tomorrow!") #active_calendars = pd.concat([now_active, tomorrow_active]) active_calendars = now_active if active_calendars.empty: print("The concatenation of today and tomorrow's calendars is empty. This should not happen.") return active_calendars def __current_service_ids(self) -> pd.core.series.Series: """ Filter the calendar entries to find all service ids that apply for today. Returns an empty list if none do. """ return self.__current_calendars()["service_id"] def __trip_ids_for_service_ids(self, service_ids: pd.core.series.Series) -> pd.core.series.Series: """ Returns a dataframe with the trip IDs for the given service IDs """ trips = self.feed.trips[self.feed.trips["service_id"].isin(service_ids)] if trips.empty: print("There are no active trips!") return trips["trip_id"] def __next_n_buses(self, trip_ids: pd.core.series.Series, n: int) -> pd.core.frame.DataFrame: now = datetime.datetime.now() current_time = now.strftime("%H:%M:%S") next_stops = self.feed.stop_times[self.feed.stop_times["stop_id"].isin(self.stop_ids) & self.feed.stop_times["trip_id"].isin(trip_ids) & (self.feed.stop_times["arrival_time"] > current_time)] next_stops = next_stops.sort_values("arrival_time") return next_stops[:n][["trip_id", "arrival_time", "stop_id"]] def __join_data(self, next_buses: pd.core.frame.DataFrame) -> pd.core.frame.DataFrame: """ Enriches the stop data with the information from other dataframes in the feed """ joined_data = (next_buses .join(self.feed.trips.set_index("trip_id"), on="trip_id") .join(self.feed.stops.set_index("stop_id"), on="stop_id") .join(self.feed.routes.set_index("route_id"), on="route_id")) return joined_data def __time_to_seconds(self, s: str) -> int: sx = s.split(":") if len(sx) != 3: print("Malformed timestamp:", s) return 0 return int(sx[0]) * 3600 + int(sx[1]) * 60 + int (sx[2]) def __due_in_seconds(self, time_str: str) -> int: """ Returns the number of seconds in the future that the time_str (format hh:mm:ss) is """ now = datetime.datetime.now().strftime("%H:%M:%S") tnow = self.__time_to_seconds(now) tstop = self.__time_to_seconds(time_str) if tstop > tnow: return tstop - tnow else: # If the stop time is less than the current time, the stop is tomorrow return tstop + 86400 - tnow def __lookup_headsign_by_route(self, route_id: str, direction_id: int) -> str: """ Look up a destination string in Trips from the route and direction """ trips = self.feed.trips return trips[(trips["route_id"] == route_id) & (trips["direction_id"] == direction_id)].head(1)["trip_headsign"].item() def __poll_gtfsr_deltas(self) -> list[map, set]: # Poll GTFS-R API if self.gtfs_r_api_key != "": headers = {"x-api-key": self.gtfs_r_api_key} response = requests.get(url = self.gtfs_r_url, headers = headers) if response.status_code != 200: print("GTFS-R sent non-OK response: {}\n{}".format(response.status_code, response.text)) return ({}, [], []) deltas_json = json.loads(response.text) else: deltas_json = json.load(open("example.json")) deltas = {} canceled_trips = set() added_stops = [] # Pre-compute some data to use for added trips: relevant_service_ids = self.__current_service_ids() relevant_trips = self.feed.trips[self.feed.trips["service_id"].isin(relevant_service_ids)] relevant_route_ids = set(relevant_trips["route_id"]) today = datetime.date.today().strftime("%Y%m%d") for e in deltas_json.get("entity", []): is_deleted = e.get("is_deleted") or False try: trip_update = e.get("trip_update") trip = trip_update.get("trip") trip_id = trip.get("trip_id") trip_action = trip.get("schedule_relationship") if trip_action == "SCHEDULED": for u in e.get("trip_update", {}).get("stop_time_update", []): delay = u.get("arrival", u.get("departure", {})).get("delay", 0) deltas_for_trip = (deltas.get(trip_id) or {}) deltas_for_trip[u.get("stop_id")] = delay deltas[trip_id] = deltas_for_trip elif trip_action == "ADDED": start_date = trip.get("start_date") start_time = trip.get("start_time") route_id = trip.get("route_id") direction_id = trip.get("direction_id") # Check if the route is part of the routes we care about if not route_id in relevant_route_ids: continue # And that it's for today current_time = datetime.datetime.now().strftime("%H:%M:%S") if start_date > today or start_time > current_time: continue # Look for the entry for any of the stops we want wanted_stop_ids = self.__wanted_stop_ids() for stop_time_update in e.get("trip_update").get("stop_time_update", []): if stop_time_update.get("stop_id", "") in wanted_stop_ids: arrival_time = int((stop_time_update.get("arrival", stop_time_update.get("departure", {})).get("time", 0))) if arrival_time < int(time.time()): continue new_arrival = ArrivalTime( stop_id = stop_time_update.get("stop_id"), route_id = self.feed.routes[self.feed.routes["route_id"] == route_id]["route_short_name"].item(), destination = self.__lookup_headsign_by_route(route_id, direction_id), due_in_seconds = arrival_time - int(time.time()), is_added = True ) print("Added route:", new_arrival) added_stops.append(new_arrival) elif trip_action == "CANCELED": canceled_trips.add(trip_id) else: print("Unsupported action:", trip_action) except Exception as x: print("Error parsing GTFS-R entry:", str(e)) raise(x) return deltas, canceled_trips, added_stops def get_next_n_buses(self, num_entries: int) -> pd.core.frame.DataFrame: """ Returns a dataframe with the information of the next N buses arriving at the requested stops. """ service_ids = self.__current_service_ids() trip_ids = self.__trip_ids_for_service_ids(service_ids) next_buses = self.__next_n_buses(trip_ids, num_entries) joined_data = self.__join_data(next_buses) return joined_data def start(self) -> None: """ Start the refresh thread """ self._refresh_thread.start() self.refresh() def refresh(self): """ Create and enqueue the refreshed stop data """ # Retrieve the GTFS-R deltas deltas, canceled_trips, added_stops = self.__poll_gtfsr_deltas() if len(deltas) > 0 or len(canceled_trips) > 0 or len(added_stops) > 0: # Only update deltas and canceled trips if the API returns data self.deltas = deltas self.canceled_trips = canceled_trips self.added_stops = added_stops arrivals = [] # take more entries than we need in case there are cancelations buses = self.get_next_n_buses(10) for index, bus in buses.iterrows(): if not bus["trip_id"] in self.canceled_trips: delta = self.deltas.get(bus["trip_id"], {}).get(bus["stop_id"], 0) if delta != 0: print("Delta for route {} stop {} is {}".format(bus["route_short_name"], bus["stop_id"], delta)) arrival = ArrivalTime(stop_id = bus["stop_id"], route_id = bus["route_short_name"], destination = bus["trip_headsign"], due_in_seconds = self.__due_in_seconds(bus["arrival_time"]) + delta, is_added = False ) arrivals.append(arrival) if len(self.added_stops) > 0: # Append the added stops from GTFS-R and re-sort arrivals.extend(self.added_stops) arrivals.sort() # Select the first 5 of what remains arrivals = arrivals[0:5] if self._update_queue: self._update_queue.put(arrivals) gc.collect() return arrivals def every(delay, task) -> None: """ Auxilliary function to schedule updates. Taken from https://stackoverflow.com/questions/474528/what-is-the-best-way-to-repeatedly-execute-a-function-every-x-seconds """ next_time = time.time() + delay while True: time.sleep(max(0, next_time - time.time())) try: task() except Exception: traceback.print_exc() # in production code you might want to have this instead of course: # logger.exception("Problem while executing repetitive task.") # skip tasks if we are behind schedule: next_time += (time.time() - next_time) // delay * delay + delay