Remove the original Dublin Bus SOAP API client.
This commit is contained in:
parent
b8ec772c51
commit
1c50e6ae3b
|
|
@ -1,82 +0,0 @@
|
|||
import iso8601
|
||||
import queue
|
||||
import time
|
||||
import traceback
|
||||
import threading
|
||||
import xml.etree.ElementTree as ET
|
||||
from zeep import Client
|
||||
from arrival_times import ArrivalTime
|
||||
|
||||
# Path to StopData elements within the XML
|
||||
STOPDATA_PATH = ('{http://schemas.xmlsoap.org/soap/envelope/}Body/' +
|
||||
'{http://dublinbus.ie/}GetRealTimeStopDataResponse/' +
|
||||
'{http://dublinbus.ie/}GetRealTimeStopDataResult/' +
|
||||
'{urn:schemas-microsoft-com:xml-diffgram-v1}diffgram/' +
|
||||
'DocumentElement')
|
||||
|
||||
|
||||
def every(delay, task) -> None:
|
||||
""" Auxilliary function to schedule updates.
|
||||
Taken from https://stackoverflow.com/questions/474528/what-is-the-best-way-to-repeatedly-execute-a-function-every-x-seconds
|
||||
"""
|
||||
next_time = time.time() + delay
|
||||
while True:
|
||||
time.sleep(max(0, next_time - time.time()))
|
||||
try:
|
||||
task()
|
||||
except Exception:
|
||||
traceback.print_exc()
|
||||
# in production code you might want to have this instead of course:
|
||||
# logger.exception("Problem while executing repetitive task.")
|
||||
# skip tasks if we are behind schedule:
|
||||
next_time += (time.time() - next_time) // delay * delay + delay
|
||||
|
||||
|
||||
def parse_seconds(time_expr: str) -> int:
|
||||
""" Parses an XML timestamp and returns the seconds since the epoch
|
||||
E.g. 2022-09-10T20:33:30.387+01:00
|
||||
"""
|
||||
return int(iso8601.parse_date(time_expr).timestamp())
|
||||
|
||||
|
||||
class DublinBusSoapClient:
|
||||
""" Code to pull updates of the requested stops """
|
||||
|
||||
def __init__(self, stops: list[str], update_queue: queue.Queue, update_interval_seconds: int = 60) -> None :
|
||||
|
||||
# Store parameters
|
||||
self._stops = stops
|
||||
self._update_queue = update_queue
|
||||
self._update_interval_seconds = update_interval_seconds
|
||||
|
||||
# Create SOAP Client
|
||||
self._client = Client('http://rtpi.dublinbus.ie/DublinBusRTPIService.asmx?WSDL')
|
||||
|
||||
# Schedule refresh
|
||||
self._refresh_thread = threading.Thread(target=lambda: every(self._update_interval_seconds, self.refresh))
|
||||
|
||||
|
||||
def start(self) -> None:
|
||||
""" Start the refresh thread """
|
||||
self._refresh_thread.start()
|
||||
self.refresh()
|
||||
|
||||
def refresh(self) -> None:
|
||||
""" Poll for new and updated information. Queue it for display update. """
|
||||
arrivals = []
|
||||
for stop in self._stops:
|
||||
with self._client.settings(raw_response=True):
|
||||
response = self._client.service.GetRealTimeStopData(stopId=stop, forceRefresh=True)
|
||||
if response.ok:
|
||||
tree = ET.fromstring(response.text)
|
||||
stopdata_elements = tree.find(STOPDATA_PATH)
|
||||
for stopdata in (stopdata_elements or []):
|
||||
route = stopdata.find('MonitoredVehicleJourney_PublishedLineName').text
|
||||
destination = stopdata.find('MonitoredVehicleJourney_DestinationName').text
|
||||
due_in_seconds = (parse_seconds(stopdata.find('MonitoredCall_ExpectedArrivalTime').text)
|
||||
- parse_seconds(stopdata.find('Timestamp').text))
|
||||
arrival_time = ArrivalTime(stop, route, destination, due_in_seconds)
|
||||
arrivals.append(arrival_time)
|
||||
arrivals = sorted(arrivals)
|
||||
self._update_queue.put(arrivals)
|
||||
|
||||
Loading…
Reference in New Issue