diff --git a/dublinbus_soap_client.py b/dublinbus_soap_client.py deleted file mode 100644 index 025dcbd..0000000 --- a/dublinbus_soap_client.py +++ /dev/null @@ -1,82 +0,0 @@ -import iso8601 -import queue -import time -import traceback -import threading -import xml.etree.ElementTree as ET -from zeep import Client -from arrival_times import ArrivalTime - -# Path to StopData elements within the XML -STOPDATA_PATH = ('{http://schemas.xmlsoap.org/soap/envelope/}Body/' + - '{http://dublinbus.ie/}GetRealTimeStopDataResponse/' + - '{http://dublinbus.ie/}GetRealTimeStopDataResult/' + - '{urn:schemas-microsoft-com:xml-diffgram-v1}diffgram/' + - 'DocumentElement') - - -def every(delay, task) -> None: - """ Auxilliary function to schedule updates. - Taken from https://stackoverflow.com/questions/474528/what-is-the-best-way-to-repeatedly-execute-a-function-every-x-seconds - """ - next_time = time.time() + delay - while True: - time.sleep(max(0, next_time - time.time())) - try: - task() - except Exception: - traceback.print_exc() - # in production code you might want to have this instead of course: - # logger.exception("Problem while executing repetitive task.") - # skip tasks if we are behind schedule: - next_time += (time.time() - next_time) // delay * delay + delay - - -def parse_seconds(time_expr: str) -> int: - """ Parses an XML timestamp and returns the seconds since the epoch - E.g. 2022-09-10T20:33:30.387+01:00 - """ - return int(iso8601.parse_date(time_expr).timestamp()) - - -class DublinBusSoapClient: - """ Code to pull updates of the requested stops """ - - def __init__(self, stops: list[str], update_queue: queue.Queue, update_interval_seconds: int = 60) -> None : - - # Store parameters - self._stops = stops - self._update_queue = update_queue - self._update_interval_seconds = update_interval_seconds - - # Create SOAP Client - self._client = Client('http://rtpi.dublinbus.ie/DublinBusRTPIService.asmx?WSDL') - - # Schedule refresh - self._refresh_thread = threading.Thread(target=lambda: every(self._update_interval_seconds, self.refresh)) - - - def start(self) -> None: - """ Start the refresh thread """ - self._refresh_thread.start() - self.refresh() - - def refresh(self) -> None: - """ Poll for new and updated information. Queue it for display update. """ - arrivals = [] - for stop in self._stops: - with self._client.settings(raw_response=True): - response = self._client.service.GetRealTimeStopData(stopId=stop, forceRefresh=True) - if response.ok: - tree = ET.fromstring(response.text) - stopdata_elements = tree.find(STOPDATA_PATH) - for stopdata in (stopdata_elements or []): - route = stopdata.find('MonitoredVehicleJourney_PublishedLineName').text - destination = stopdata.find('MonitoredVehicleJourney_DestinationName').text - due_in_seconds = (parse_seconds(stopdata.find('MonitoredCall_ExpectedArrivalTime').text) - - parse_seconds(stopdata.find('Timestamp').text)) - arrival_time = ArrivalTime(stop, route, destination, due_in_seconds) - arrivals.append(arrival_time) - arrivals = sorted(arrivals) - self._update_queue.put(arrivals) - \ No newline at end of file diff --git a/main.py b/main.py index b54ea06..cfc24e6 100755 --- a/main.py +++ b/main.py @@ -5,7 +5,6 @@ from glob import glob import pygame from pygame.locals import * from time import sleep -from dublinbus_soap_client import DublinBusSoapClient import queue from arrival_times import ArrivalTime from gtfs_client import GTFSClient