From 35d026168222aefa4b5b8f2ccb8ae0ee48bb9d61 Mon Sep 17 00:00:00 2001 From: Nahuel Lofeudo Date: Sat, 23 Sep 2023 21:21:34 +0100 Subject: [PATCH] Make the GTFS-R client resilient to network errors. --- gtfs_client.py | 143 +++++++++++++++++++++++++------------------------ 1 file changed, 73 insertions(+), 70 deletions(-) diff --git a/gtfs_client.py b/gtfs_client.py index 29af4a8..dc7f552 100644 --- a/gtfs_client.py +++ b/gtfs_client.py @@ -253,83 +253,87 @@ class GTFSClient(): def __poll_gtfsr_deltas(self) -> list[map, set]: - # Poll GTFS-R API - if self.gtfs_r_api_key != "": - headers = {"x-api-key": self.gtfs_r_api_key} - response = requests.get(url = self.gtfs_r_url, headers = headers) - if response.status_code != 200: - print("GTFS-R sent non-OK response: {}\n{}".format(response.status_code, response.text)) - return ({}, [], []) + try: + # Poll GTFS-R API + if self.gtfs_r_api_key != "": + headers = {"x-api-key": self.gtfs_r_api_key} + response = requests.get(url = self.gtfs_r_url, headers = headers) + if response.status_code != 200: + print("GTFS-R sent non-OK response: {}\n{}".format(response.status_code, response.text)) + return ({}, [], []) - deltas_json = json.loads(response.text) - else: - deltas_json = json.load(open("example.json")) + deltas_json = json.loads(response.text) + else: + deltas_json = json.load(open("example.json")) - deltas = {} - canceled_trips = set() - added_stops = [] + deltas = {} + canceled_trips = set() + added_stops = [] - # Pre-compute some data to use for added trips: - relevant_service_ids = self.__current_service_ids() - relevant_trips = self.feed.trips[self.feed.trips["service_id"].isin(relevant_service_ids)] - relevant_route_ids = set(relevant_trips["route_id"]) - today = datetime.date.today().strftime("%Y%m%d") + # Pre-compute some data to use for added trips: + relevant_service_ids = self.__current_service_ids() + relevant_trips = self.feed.trips[self.feed.trips["service_id"].isin(relevant_service_ids)] + relevant_route_ids = set(relevant_trips["route_id"]) + today = datetime.date.today().strftime("%Y%m%d") - for e in deltas_json.get("entity", []): - is_deleted = e.get("is_deleted") or False - try: - trip_update = e.get("trip_update") - trip = trip_update.get("trip") - trip_id = trip.get("trip_id") - trip_action = trip.get("schedule_relationship") - if trip_action == "SCHEDULED": - for u in e.get("trip_update", {}).get("stop_time_update", []): - delay = u.get("arrival", u.get("departure", {})).get("delay", 0) - deltas_for_trip = (deltas.get(trip_id) or {}) - deltas_for_trip[u.get("stop_id")] = delay - deltas[trip_id] = deltas_for_trip + for e in deltas_json.get("entity", []): + is_deleted = e.get("is_deleted") or False + try: + trip_update = e.get("trip_update") + trip = trip_update.get("trip") + trip_id = trip.get("trip_id") + trip_action = trip.get("schedule_relationship") + if trip_action == "SCHEDULED": + for u in e.get("trip_update", {}).get("stop_time_update", []): + delay = u.get("arrival", u.get("departure", {})).get("delay", 0) + deltas_for_trip = (deltas.get(trip_id) or {}) + deltas_for_trip[u.get("stop_id")] = delay + deltas[trip_id] = deltas_for_trip - elif trip_action == "ADDED": - start_date = trip.get("start_date") - start_time = trip.get("start_time") - route_id = trip.get("route_id") - direction_id = trip.get("direction_id") - - # Check if the route is part of the routes we care about - if not route_id in relevant_route_ids: - continue + elif trip_action == "ADDED": + start_date = trip.get("start_date") + start_time = trip.get("start_time") + route_id = trip.get("route_id") + direction_id = trip.get("direction_id") - # And that it's for today - current_time = datetime.datetime.now().strftime("%H:%M:%S") - if start_date > today or start_time > current_time: - continue + # Check if the route is part of the routes we care about + if not route_id in relevant_route_ids: + continue - # Look for the entry for any of the stops we want - wanted_stop_ids = self.__wanted_stop_ids() - for stop_time_update in e.get("trip_update").get("stop_time_update", []): - if stop_time_update.get("stop_id", "") in wanted_stop_ids: - arrival_time = int((stop_time_update.get("arrival", stop_time_update.get("departure", {})).get("time", 0))) - if arrival_time < int(time.time()): - continue - new_arrival = ArrivalTime( - stop_id = stop_time_update.get("stop_code"), - route_id = self.feed.routes[self.feed.routes["route_id"] == route_id]["route_short_name"].item(), - destination = self.__lookup_headsign_by_route(route_id, direction_id), - due_in_seconds = arrival_time - int(time.time()), - is_added = True - ) - print("Added route:", new_arrival) - added_stops.append(new_arrival) + # And that it's for today + current_time = datetime.datetime.now().strftime("%H:%M:%S") + if start_date > today or start_time > current_time: + continue - elif trip_action == "CANCELED": - canceled_trips.add(trip_id) - else: - print("Unsupported action:", trip_action) - except Exception as x: - print("Error parsing GTFS-R entry:", str(e)) - raise(x) - - return deltas, canceled_trips, added_stops + # Look for the entry for any of the stops we want + wanted_stop_ids = self.__wanted_stop_ids() + for stop_time_update in e.get("trip_update").get("stop_time_update", []): + if stop_time_update.get("stop_id", "") in wanted_stop_ids: + arrival_time = int((stop_time_update.get("arrival", stop_time_update.get("departure", {})).get("time", 0))) + if arrival_time < int(time.time()): + continue + new_arrival = ArrivalTime( + stop_id = stop_time_update.get("stop_code"), + route_id = self.feed.routes[self.feed.routes["route_id"] == route_id]["route_short_name"].item(), + destination = self.__lookup_headsign_by_route(route_id, direction_id), + due_in_seconds = arrival_time - int(time.time()), + is_added = True + ) + print("Added route:", new_arrival) + added_stops.append(new_arrival) + + elif trip_action == "CANCELED": + canceled_trips.add(trip_id) + else: + print("Unsupported action:", trip_action) + except Exception as x: + print("Error parsing GTFS-R entry:", str(e)) + raise(x) + + return deltas, canceled_trips, added_stops + except Exception as e: + print("Polling for GTFS-R failed:", str(e)) + return ({}, [], []) def get_next_n_buses(self, num_entries: int) -> pd.core.frame.DataFrame: @@ -348,7 +352,6 @@ class GTFSClient(): """ Create and enqueue the refreshed stop data """ - print("Refresh") # Retrieve the GTFS-R deltas deltas, canceled_trips, added_stops = self.__poll_gtfsr_deltas() if len(deltas) > 0 or len(canceled_trips) > 0 or len(added_stops) > 0: