diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..d19b3a7 --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +__pycache__/arrival_times.cpython-310.pyc +__pycache__/dublinbus_soap_client.cpython-310.pyc +jd_lcd_rounded.ttf +mockup.svg diff --git a/README.md b/README.md index 346a923..a0ce687 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,16 @@ # dublinbus-sign Emulates a Dublin Bus electronic sign, showing ETAs for different bus lines + +## Usage + +* Install all dependencies +* Download the TTF font into the program's directory +* Change main.py, updating STOPS to reflect the stop(s) you want to watch. +* Run main.py + +## Dependencies + +* iso8601 +* pygame +* zeep +* TTF Font jd_lcd_rounded.ttf by Jecko Development : https://fontstruct.com/fontstructions/show/459792/jd_lcd_rounded diff --git a/arrival_times.py b/arrival_times.py new file mode 100644 index 0000000..114869a --- /dev/null +++ b/arrival_times.py @@ -0,0 +1,18 @@ +class ArrivalTime(): + """ Represents the arrival times of buses at one of the configured stops """ + + def __init__(self, stop_id: int, route_id: str, destination: str, due_in_seconds: int) -> None: + self.stop_id = stop_id + self.route_id = route_id + self.destination = destination + self.due_in_seconds = due_in_seconds + + @property + def due_in_minutes(self) -> int: + return int(self.due_in_seconds / 60) + + def isDue(self) -> bool: + return self.due_in_minutes < 1 + + def __lt__(self, other) -> int: + return self.due_in_seconds < other.due_in_seconds \ No newline at end of file diff --git a/dublinbus_soap_client.py b/dublinbus_soap_client.py new file mode 100644 index 0000000..54d2a08 --- /dev/null +++ b/dublinbus_soap_client.py @@ -0,0 +1,81 @@ +import iso8601 +import queue +import time +import traceback +import threading +import xml.etree.ElementTree as ET +from zeep import Client +from arrival_times import ArrivalTime + +# Path to StopData elements within the XML +STOPDATA_PATH = ('{http://schemas.xmlsoap.org/soap/envelope/}Body/' + + '{http://dublinbus.ie/}GetRealTimeStopDataResponse/' + + '{http://dublinbus.ie/}GetRealTimeStopDataResult/' + + '{urn:schemas-microsoft-com:xml-diffgram-v1}diffgram/' + + 'DocumentElement') + + +def every(delay, task) -> None: + """ Auxilliary function to schedule updates. + Taken from https://stackoverflow.com/questions/474528/what-is-the-best-way-to-repeatedly-execute-a-function-every-x-seconds + """ + next_time = time.time() + delay + while True: + time.sleep(max(0, next_time - time.time())) + try: + task() + except Exception: + traceback.print_exc() + # in production code you might want to have this instead of course: + # logger.exception("Problem while executing repetitive task.") + # skip tasks if we are behind schedule: + next_time += (time.time() - next_time) // delay * delay + delay + + +def parse_seconds(time_expr: str) -> int: + """ Parses an XML timestamp and returns the seconds since the epoch + E.g. 2022-09-10T20:33:30.387+01:00 + """ + return int(iso8601.parse_date(time_expr).timestamp()) + + +class DublinBusSoapClient: + """ Code to pull updates of the requested stops """ + + def __init__(self, stops: list[str], update_queue: queue.Queue, update_interval_seconds: int = 60) -> None : + + # Store parameters + self._stops = stops + self._update_queue = update_queue + self._update_interval_seconds = update_interval_seconds + + # Create SOAP Client + self._client = Client('http://rtpi.dublinbus.ie/DublinBusRTPIService.asmx?WSDL') + + # Schedule refresh + self._refresh_thread = threading.Thread(target=lambda: every(self._update_interval_seconds, self.refresh)) + + + def start(self) -> None: + """ Start the refresh thread """ + self._refresh_thread.start() + self.refresh() + + def refresh(self) -> None: + arrivals = [] + for stop in self._stops: + with self._client.settings(raw_response=True): + response = self._client.service.GetRealTimeStopData(stopId=stop, forceRefresh=True) + if response.ok: + tree = ET.fromstring(response.text) + stopdata_elements = tree.find(STOPDATA_PATH) + for stopdata in (stopdata_elements or []): + route = stopdata.find('MonitoredVehicleJourney_PublishedLineName').text + destination = stopdata.find('MonitoredVehicleJourney_DestinationName').text + due_in_seconds = (parse_seconds(stopdata.find('MonitoredCall_ExpectedArrivalTime').text) + - parse_seconds(stopdata.find('Timestamp').text)) + arrival_time = ArrivalTime(stop, route, destination, due_in_seconds) + arrivals.append(arrival_time) + arrivals = sorted(arrivals) + self._update_queue.put(arrivals) + \ No newline at end of file diff --git a/main.py b/main.py new file mode 100644 index 0000000..922efa7 --- /dev/null +++ b/main.py @@ -0,0 +1,108 @@ +#!/usr/bin/env python3 + +from glob import glob +import pygame +from pygame.locals import * +from time import sleep +from dublinbus_soap_client import DublinBusSoapClient +import queue +from arrival_times import ArrivalTime + +# Constants +# The font is JD LCD Rounded by Jecko Development +# https://fontstruct.com/fontstructions/show/459792/jd_lcd_rounded +TEXT_FONT = 'jd_lcd_rounded.ttf' +LINE_COUNT = 5 +COLOR_LCD : pygame.Color = pygame.Color(244, 203, 96) +COLOR_BACKGROUND = pygame.Color(0, 0, 0) +UPDATE_INTERVAL_SECONDS = 30 +TEXT_SIZE = 80 # Size of the font in pixels +STOPS = [ + 2410, # College Drive + 1114 # Priory Walk +] + +# Offsets of each part within a line +XOFFSET_ROUTE = 12 # 1920x720 -> 24 +XOFFSET_DESTINATION = 150 # 1920x720 -> 300 +XOFFSEET_TIME_LEFT = 803 # 1920x720 -> 1606 +INTER_LINE_SPACE = 0 # 1920x720 -> 0 + +# Some global variables +window : pygame.Surface = None +font: pygame.font.Font = None +update_queue = queue.Queue(maxsize=10) +dublinbus_client = DublinBusSoapClient(stops=STOPS, update_queue=update_queue, update_interval_seconds=UPDATE_INTERVAL_SECONDS) + + +def get_line_offset(line: int) -> int: + global font + return line * (font.get_height() + INTER_LINE_SPACE) + +def write_entry(line: int, + route: str = '', destination: str = '', time_left: str = '', + time_color: Color = COLOR_LCD, text_color: Color = COLOR_LCD): + # Step 1: Render the fragments + route_img = font.render(route[0:4], True, text_color) + destination_img = font.render(destination[0:21], True, text_color) + time_left_img = font.render(time_left[0:5], True, time_color) + + # Compose the line + vertical_offset = get_line_offset(line) + window.blit(route_img, dest=(XOFFSET_ROUTE, vertical_offset)) + window.blit(destination_img, dest=(XOFFSET_DESTINATION, vertical_offset)) + window.blit(time_left_img, dest=(XOFFSEET_TIME_LEFT, vertical_offset)) + +def update_screen(updates: list[ArrivalTime]) -> None: + """ Repaint the screen with the new arrival times """ + updates = updates[0:LINE_COUNT] # take the first X lines + for line_num, update in enumerate(updates): + write_entry( + line=line_num, + route=update.route_id, + destination=update.destination, + time_left='Due' if update.isDue() else f'{update.due_in_minutes}min', + time_color=COLOR_LCD + ) + +def clear_screen() -> None: + pygame.draw.rect(surface=window, color=COLOR_BACKGROUND, width=0, rect=(0, 0, window.get_width(), window.get_height())) + + + +def main(): + global font + global window + + """ Main method. Initialise graph """ + pygame.init() + pygame.display.init() + pygame.font.init() + window = pygame.display.set_mode(size=(960, 360), flags=DOUBLEBUF, display=1) + font = pygame.font.Font(TEXT_FONT, TEXT_SIZE) + font.set_bold(True) + + # Paint black + clear_screen() + pygame.display.flip() + dublinbus_client.start() + + # Main event loop + running = True + while running: + pygame.event.wait(timeout=1) + for e in pygame.event.get(): + if e.type == pygame.QUIT: + running = False + #end event handling + if update_queue.not_empty: + clear_screen() + updates = update_queue.get() + update_screen(updates) + pygame.display.flip() + + pygame.quit() + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..88e3307 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,3 @@ +iso8601==1.0.2 +pygame==2.1.2 +zeep==4.1.0