Merge deltas from the GTFS-R feed into the estimations
This commit is contained in:
parent
9877da7bc4
commit
56c0417705
|
|
@ -3,6 +3,7 @@ import refresh_feed
|
||||||
from arrival_times import ArrivalTime
|
from arrival_times import ArrivalTime
|
||||||
import datetime
|
import datetime
|
||||||
import gtfs_kit as gk
|
import gtfs_kit as gk
|
||||||
|
import json
|
||||||
import os
|
import os
|
||||||
import pandas as pd
|
import pandas as pd
|
||||||
import queue
|
import queue
|
||||||
|
|
@ -192,22 +193,46 @@ class GTFSClient():
|
||||||
|
|
||||||
return joined_data
|
return joined_data
|
||||||
|
|
||||||
def _time_to_seconds(self, s: str) -> int:
|
def __time_to_seconds(self, s: str) -> int:
|
||||||
sx = s.split(":")
|
sx = s.split(":")
|
||||||
if len(sx) != 3:
|
if len(sx) != 3:
|
||||||
print("Malformed timestamp:", s)
|
print("Malformed timestamp:", s)
|
||||||
return 0
|
return 0
|
||||||
return int(sx[0]) * 3600 + int(sx[1]) * 60 + int (sx[2])
|
return int(sx[0]) * 3600 + int(sx[1]) * 60 + int (sx[2])
|
||||||
|
|
||||||
def _due_in_seconds(self, time_str: str) -> int:
|
def __due_in_seconds(self, time_str: str) -> int:
|
||||||
"""
|
"""
|
||||||
Returns the number of seconds in the future that the time_str (format hh:mm:ss) is
|
Returns the number of seconds in the future that the time_str (format hh:mm:ss) is
|
||||||
"""
|
"""
|
||||||
now = datetime.datetime.now().strftime("%H:%M:%S")
|
now = datetime.datetime.now().strftime("%H:%M:%S")
|
||||||
tnow = self._time_to_seconds(now)
|
tnow = self.__time_to_seconds(now)
|
||||||
tstop = self._time_to_seconds(time_str)
|
tstop = self.__time_to_seconds(time_str)
|
||||||
return tstop - tnow
|
return tstop - tnow
|
||||||
|
|
||||||
|
def __poll_gtfsr_deltas(self) -> list[map, set]:
|
||||||
|
deltas_json = json.load(open("example.json"))
|
||||||
|
|
||||||
|
deltas = {}
|
||||||
|
canceled_trips = set()
|
||||||
|
|
||||||
|
for e in deltas_json.get("Entity"):
|
||||||
|
is_deleted = e.get("IsDeleted") or False
|
||||||
|
try:
|
||||||
|
trip_id = e.get("TripUpdate").get("Trip").get("TripId")
|
||||||
|
if e.get("TripUpdate").get("Trip").get("ScheduleRelationship") == "Scheduled":
|
||||||
|
for u in e.get("TripUpdate").get("StopTimeUpdate"):
|
||||||
|
delay = u.get("Arrival", u.get("Departure", {})).get("Delay", 0)
|
||||||
|
deltas_for_trip = (deltas.get(trip_id) or {})
|
||||||
|
deltas_for_trip[u.get("StopId")] = delay
|
||||||
|
deltas[trip_id] = deltas_for_trip
|
||||||
|
else:
|
||||||
|
canceled_trips.add(trip_id)
|
||||||
|
except Exception as x:
|
||||||
|
print("Error parsing GTFS-R entry:", str(e))
|
||||||
|
raise(x)
|
||||||
|
|
||||||
|
return deltas, canceled_trips
|
||||||
|
|
||||||
|
|
||||||
def get_next_n_buses(self, num_entries: int) -> pd.core.frame.DataFrame:
|
def get_next_n_buses(self, num_entries: int) -> pd.core.frame.DataFrame:
|
||||||
"""
|
"""
|
||||||
|
|
@ -230,18 +255,29 @@ class GTFSClient():
|
||||||
"""
|
"""
|
||||||
Create and enqueue the refreshed stop data
|
Create and enqueue the refreshed stop data
|
||||||
"""
|
"""
|
||||||
|
# Retrieve the GTFS-R deltas
|
||||||
arrivals = []
|
deltas, canceled_trips = self.__poll_gtfsr_deltas()
|
||||||
|
|
||||||
buses = self.get_next_n_buses(5)
|
#
|
||||||
|
arrivals = []
|
||||||
|
# take more entries than we need in case there are cancelations
|
||||||
|
buses = self.get_next_n_buses(10)
|
||||||
|
|
||||||
for index, bus in buses.iterrows():
|
for index, bus in buses.iterrows():
|
||||||
arrival = ArrivalTime(stop_id = bus["stop_id"],
|
if not bus["trip_id"] in canceled_trips:
|
||||||
route_id = bus["route_short_name"],
|
delta = deltas.get(bus["trip_id"], {}).get(bus["stop_id"], 0)
|
||||||
destination= bus["route_long_name"].split(" - ")[1].strip(),
|
if delta != 0:
|
||||||
due_in_seconds = self._due_in_seconds(bus["arrival_time"])
|
print("Delta for route {} stop {} is {}".format(bus["route_short_name"], bus["stop_id"], delta))
|
||||||
)
|
|
||||||
arrivals.append(arrival)
|
arrival = ArrivalTime(stop_id = bus["stop_id"],
|
||||||
|
route_id = bus["route_short_name"],
|
||||||
|
destination= bus["route_long_name"].split(" - ")[1].strip(),
|
||||||
|
due_in_seconds = self.__due_in_seconds(bus["arrival_time"]) + delta
|
||||||
|
)
|
||||||
|
arrivals.append(arrival)
|
||||||
|
|
||||||
|
# Select the first 5 of what remains
|
||||||
|
arrivals = arrivals[0:5]
|
||||||
|
|
||||||
if self._update_queue:
|
if self._update_queue:
|
||||||
self._update_queue.put(arrivals)
|
self._update_queue.put(arrivals)
|
||||||
|
|
@ -267,4 +303,4 @@ def every(delay, task) -> None:
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
c = GTFSClient('https://www.transportforireland.ie/transitData/google_transit_combined.zip',
|
c = GTFSClient('https://www.transportforireland.ie/transitData/google_transit_combined.zip',
|
||||||
['College Drive, stop 2410', 'Priory Walk, stop 1114'], None, None)
|
['College Drive, stop 2410', 'Priory Walk, stop 1114'], None, None)
|
||||||
print(c.refresh())
|
print(c.refresh())
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue