Make the GTFS-R client resilient to network errors.
This commit is contained in:
parent
36749669b4
commit
35d0261682
139
gtfs_client.py
139
gtfs_client.py
|
|
@ -253,83 +253,87 @@ class GTFSClient():
|
||||||
|
|
||||||
def __poll_gtfsr_deltas(self) -> list[map, set]:
|
def __poll_gtfsr_deltas(self) -> list[map, set]:
|
||||||
|
|
||||||
# Poll GTFS-R API
|
try:
|
||||||
if self.gtfs_r_api_key != "":
|
# Poll GTFS-R API
|
||||||
headers = {"x-api-key": self.gtfs_r_api_key}
|
if self.gtfs_r_api_key != "":
|
||||||
response = requests.get(url = self.gtfs_r_url, headers = headers)
|
headers = {"x-api-key": self.gtfs_r_api_key}
|
||||||
if response.status_code != 200:
|
response = requests.get(url = self.gtfs_r_url, headers = headers)
|
||||||
print("GTFS-R sent non-OK response: {}\n{}".format(response.status_code, response.text))
|
if response.status_code != 200:
|
||||||
return ({}, [], [])
|
print("GTFS-R sent non-OK response: {}\n{}".format(response.status_code, response.text))
|
||||||
|
return ({}, [], [])
|
||||||
|
|
||||||
deltas_json = json.loads(response.text)
|
deltas_json = json.loads(response.text)
|
||||||
else:
|
else:
|
||||||
deltas_json = json.load(open("example.json"))
|
deltas_json = json.load(open("example.json"))
|
||||||
|
|
||||||
deltas = {}
|
deltas = {}
|
||||||
canceled_trips = set()
|
canceled_trips = set()
|
||||||
added_stops = []
|
added_stops = []
|
||||||
|
|
||||||
# Pre-compute some data to use for added trips:
|
# Pre-compute some data to use for added trips:
|
||||||
relevant_service_ids = self.__current_service_ids()
|
relevant_service_ids = self.__current_service_ids()
|
||||||
relevant_trips = self.feed.trips[self.feed.trips["service_id"].isin(relevant_service_ids)]
|
relevant_trips = self.feed.trips[self.feed.trips["service_id"].isin(relevant_service_ids)]
|
||||||
relevant_route_ids = set(relevant_trips["route_id"])
|
relevant_route_ids = set(relevant_trips["route_id"])
|
||||||
today = datetime.date.today().strftime("%Y%m%d")
|
today = datetime.date.today().strftime("%Y%m%d")
|
||||||
|
|
||||||
for e in deltas_json.get("entity", []):
|
for e in deltas_json.get("entity", []):
|
||||||
is_deleted = e.get("is_deleted") or False
|
is_deleted = e.get("is_deleted") or False
|
||||||
try:
|
try:
|
||||||
trip_update = e.get("trip_update")
|
trip_update = e.get("trip_update")
|
||||||
trip = trip_update.get("trip")
|
trip = trip_update.get("trip")
|
||||||
trip_id = trip.get("trip_id")
|
trip_id = trip.get("trip_id")
|
||||||
trip_action = trip.get("schedule_relationship")
|
trip_action = trip.get("schedule_relationship")
|
||||||
if trip_action == "SCHEDULED":
|
if trip_action == "SCHEDULED":
|
||||||
for u in e.get("trip_update", {}).get("stop_time_update", []):
|
for u in e.get("trip_update", {}).get("stop_time_update", []):
|
||||||
delay = u.get("arrival", u.get("departure", {})).get("delay", 0)
|
delay = u.get("arrival", u.get("departure", {})).get("delay", 0)
|
||||||
deltas_for_trip = (deltas.get(trip_id) or {})
|
deltas_for_trip = (deltas.get(trip_id) or {})
|
||||||
deltas_for_trip[u.get("stop_id")] = delay
|
deltas_for_trip[u.get("stop_id")] = delay
|
||||||
deltas[trip_id] = deltas_for_trip
|
deltas[trip_id] = deltas_for_trip
|
||||||
|
|
||||||
elif trip_action == "ADDED":
|
elif trip_action == "ADDED":
|
||||||
start_date = trip.get("start_date")
|
start_date = trip.get("start_date")
|
||||||
start_time = trip.get("start_time")
|
start_time = trip.get("start_time")
|
||||||
route_id = trip.get("route_id")
|
route_id = trip.get("route_id")
|
||||||
direction_id = trip.get("direction_id")
|
direction_id = trip.get("direction_id")
|
||||||
|
|
||||||
# Check if the route is part of the routes we care about
|
# Check if the route is part of the routes we care about
|
||||||
if not route_id in relevant_route_ids:
|
if not route_id in relevant_route_ids:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# And that it's for today
|
# And that it's for today
|
||||||
current_time = datetime.datetime.now().strftime("%H:%M:%S")
|
current_time = datetime.datetime.now().strftime("%H:%M:%S")
|
||||||
if start_date > today or start_time > current_time:
|
if start_date > today or start_time > current_time:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# Look for the entry for any of the stops we want
|
# Look for the entry for any of the stops we want
|
||||||
wanted_stop_ids = self.__wanted_stop_ids()
|
wanted_stop_ids = self.__wanted_stop_ids()
|
||||||
for stop_time_update in e.get("trip_update").get("stop_time_update", []):
|
for stop_time_update in e.get("trip_update").get("stop_time_update", []):
|
||||||
if stop_time_update.get("stop_id", "") in wanted_stop_ids:
|
if stop_time_update.get("stop_id", "") in wanted_stop_ids:
|
||||||
arrival_time = int((stop_time_update.get("arrival", stop_time_update.get("departure", {})).get("time", 0)))
|
arrival_time = int((stop_time_update.get("arrival", stop_time_update.get("departure", {})).get("time", 0)))
|
||||||
if arrival_time < int(time.time()):
|
if arrival_time < int(time.time()):
|
||||||
continue
|
continue
|
||||||
new_arrival = ArrivalTime(
|
new_arrival = ArrivalTime(
|
||||||
stop_id = stop_time_update.get("stop_code"),
|
stop_id = stop_time_update.get("stop_code"),
|
||||||
route_id = self.feed.routes[self.feed.routes["route_id"] == route_id]["route_short_name"].item(),
|
route_id = self.feed.routes[self.feed.routes["route_id"] == route_id]["route_short_name"].item(),
|
||||||
destination = self.__lookup_headsign_by_route(route_id, direction_id),
|
destination = self.__lookup_headsign_by_route(route_id, direction_id),
|
||||||
due_in_seconds = arrival_time - int(time.time()),
|
due_in_seconds = arrival_time - int(time.time()),
|
||||||
is_added = True
|
is_added = True
|
||||||
)
|
)
|
||||||
print("Added route:", new_arrival)
|
print("Added route:", new_arrival)
|
||||||
added_stops.append(new_arrival)
|
added_stops.append(new_arrival)
|
||||||
|
|
||||||
elif trip_action == "CANCELED":
|
elif trip_action == "CANCELED":
|
||||||
canceled_trips.add(trip_id)
|
canceled_trips.add(trip_id)
|
||||||
else:
|
else:
|
||||||
print("Unsupported action:", trip_action)
|
print("Unsupported action:", trip_action)
|
||||||
except Exception as x:
|
except Exception as x:
|
||||||
print("Error parsing GTFS-R entry:", str(e))
|
print("Error parsing GTFS-R entry:", str(e))
|
||||||
raise(x)
|
raise(x)
|
||||||
|
|
||||||
return deltas, canceled_trips, added_stops
|
return deltas, canceled_trips, added_stops
|
||||||
|
except Exception as e:
|
||||||
|
print("Polling for GTFS-R failed:", str(e))
|
||||||
|
return ({}, [], [])
|
||||||
|
|
||||||
|
|
||||||
def get_next_n_buses(self, num_entries: int) -> pd.core.frame.DataFrame:
|
def get_next_n_buses(self, num_entries: int) -> pd.core.frame.DataFrame:
|
||||||
|
|
@ -348,7 +352,6 @@ class GTFSClient():
|
||||||
"""
|
"""
|
||||||
Create and enqueue the refreshed stop data
|
Create and enqueue the refreshed stop data
|
||||||
"""
|
"""
|
||||||
print("Refresh")
|
|
||||||
# Retrieve the GTFS-R deltas
|
# Retrieve the GTFS-R deltas
|
||||||
deltas, canceled_trips, added_stops = self.__poll_gtfsr_deltas()
|
deltas, canceled_trips, added_stops = self.__poll_gtfsr_deltas()
|
||||||
if len(deltas) > 0 or len(canceled_trips) > 0 or len(added_stops) > 0:
|
if len(deltas) > 0 or len(canceled_trips) > 0 or len(added_stops) > 0:
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue