First version
This commit is contained in:
parent
519bc0aee2
commit
01a97d19a2
|
|
@ -0,0 +1,4 @@
|
||||||
|
__pycache__/arrival_times.cpython-310.pyc
|
||||||
|
__pycache__/dublinbus_soap_client.cpython-310.pyc
|
||||||
|
jd_lcd_rounded.ttf
|
||||||
|
mockup.svg
|
||||||
14
README.md
14
README.md
|
|
@ -1,2 +1,16 @@
|
||||||
# dublinbus-sign
|
# dublinbus-sign
|
||||||
Emulates a Dublin Bus electronic sign, showing ETAs for different bus lines
|
Emulates a Dublin Bus electronic sign, showing ETAs for different bus lines
|
||||||
|
|
||||||
|
## Usage
|
||||||
|
|
||||||
|
* Install all dependencies
|
||||||
|
* Download the TTF font into the program's directory
|
||||||
|
* Change main.py, updating STOPS to reflect the stop(s) you want to watch.
|
||||||
|
* Run main.py
|
||||||
|
|
||||||
|
## Dependencies
|
||||||
|
|
||||||
|
* iso8601
|
||||||
|
* pygame
|
||||||
|
* zeep
|
||||||
|
* TTF Font jd_lcd_rounded.ttf by Jecko Development : https://fontstruct.com/fontstructions/show/459792/jd_lcd_rounded
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,18 @@
|
||||||
|
class ArrivalTime():
|
||||||
|
""" Represents the arrival times of buses at one of the configured stops """
|
||||||
|
|
||||||
|
def __init__(self, stop_id: int, route_id: str, destination: str, due_in_seconds: int) -> None:
|
||||||
|
self.stop_id = stop_id
|
||||||
|
self.route_id = route_id
|
||||||
|
self.destination = destination
|
||||||
|
self.due_in_seconds = due_in_seconds
|
||||||
|
|
||||||
|
@property
|
||||||
|
def due_in_minutes(self) -> int:
|
||||||
|
return int(self.due_in_seconds / 60)
|
||||||
|
|
||||||
|
def isDue(self) -> bool:
|
||||||
|
return self.due_in_minutes < 1
|
||||||
|
|
||||||
|
def __lt__(self, other) -> int:
|
||||||
|
return self.due_in_seconds < other.due_in_seconds
|
||||||
|
|
@ -0,0 +1,81 @@
|
||||||
|
import iso8601
|
||||||
|
import queue
|
||||||
|
import time
|
||||||
|
import traceback
|
||||||
|
import threading
|
||||||
|
import xml.etree.ElementTree as ET
|
||||||
|
from zeep import Client
|
||||||
|
from arrival_times import ArrivalTime
|
||||||
|
|
||||||
|
# Path to StopData elements within the XML
|
||||||
|
STOPDATA_PATH = ('{http://schemas.xmlsoap.org/soap/envelope/}Body/' +
|
||||||
|
'{http://dublinbus.ie/}GetRealTimeStopDataResponse/' +
|
||||||
|
'{http://dublinbus.ie/}GetRealTimeStopDataResult/' +
|
||||||
|
'{urn:schemas-microsoft-com:xml-diffgram-v1}diffgram/' +
|
||||||
|
'DocumentElement')
|
||||||
|
|
||||||
|
|
||||||
|
def every(delay, task) -> None:
|
||||||
|
""" Auxilliary function to schedule updates.
|
||||||
|
Taken from https://stackoverflow.com/questions/474528/what-is-the-best-way-to-repeatedly-execute-a-function-every-x-seconds
|
||||||
|
"""
|
||||||
|
next_time = time.time() + delay
|
||||||
|
while True:
|
||||||
|
time.sleep(max(0, next_time - time.time()))
|
||||||
|
try:
|
||||||
|
task()
|
||||||
|
except Exception:
|
||||||
|
traceback.print_exc()
|
||||||
|
# in production code you might want to have this instead of course:
|
||||||
|
# logger.exception("Problem while executing repetitive task.")
|
||||||
|
# skip tasks if we are behind schedule:
|
||||||
|
next_time += (time.time() - next_time) // delay * delay + delay
|
||||||
|
|
||||||
|
|
||||||
|
def parse_seconds(time_expr: str) -> int:
|
||||||
|
""" Parses an XML timestamp and returns the seconds since the epoch
|
||||||
|
E.g. 2022-09-10T20:33:30.387+01:00
|
||||||
|
"""
|
||||||
|
return int(iso8601.parse_date(time_expr).timestamp())
|
||||||
|
|
||||||
|
|
||||||
|
class DublinBusSoapClient:
|
||||||
|
""" Code to pull updates of the requested stops """
|
||||||
|
|
||||||
|
def __init__(self, stops: list[str], update_queue: queue.Queue, update_interval_seconds: int = 60) -> None :
|
||||||
|
|
||||||
|
# Store parameters
|
||||||
|
self._stops = stops
|
||||||
|
self._update_queue = update_queue
|
||||||
|
self._update_interval_seconds = update_interval_seconds
|
||||||
|
|
||||||
|
# Create SOAP Client
|
||||||
|
self._client = Client('http://rtpi.dublinbus.ie/DublinBusRTPIService.asmx?WSDL')
|
||||||
|
|
||||||
|
# Schedule refresh
|
||||||
|
self._refresh_thread = threading.Thread(target=lambda: every(self._update_interval_seconds, self.refresh))
|
||||||
|
|
||||||
|
|
||||||
|
def start(self) -> None:
|
||||||
|
""" Start the refresh thread """
|
||||||
|
self._refresh_thread.start()
|
||||||
|
self.refresh()
|
||||||
|
|
||||||
|
def refresh(self) -> None:
|
||||||
|
arrivals = []
|
||||||
|
for stop in self._stops:
|
||||||
|
with self._client.settings(raw_response=True):
|
||||||
|
response = self._client.service.GetRealTimeStopData(stopId=stop, forceRefresh=True)
|
||||||
|
if response.ok:
|
||||||
|
tree = ET.fromstring(response.text)
|
||||||
|
stopdata_elements = tree.find(STOPDATA_PATH)
|
||||||
|
for stopdata in (stopdata_elements or []):
|
||||||
|
route = stopdata.find('MonitoredVehicleJourney_PublishedLineName').text
|
||||||
|
destination = stopdata.find('MonitoredVehicleJourney_DestinationName').text
|
||||||
|
due_in_seconds = (parse_seconds(stopdata.find('MonitoredCall_ExpectedArrivalTime').text)
|
||||||
|
- parse_seconds(stopdata.find('Timestamp').text))
|
||||||
|
arrival_time = ArrivalTime(stop, route, destination, due_in_seconds)
|
||||||
|
arrivals.append(arrival_time)
|
||||||
|
arrivals = sorted(arrivals)
|
||||||
|
self._update_queue.put(arrivals)
|
||||||
|
|
||||||
|
|
@ -0,0 +1,108 @@
|
||||||
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
|
from glob import glob
|
||||||
|
import pygame
|
||||||
|
from pygame.locals import *
|
||||||
|
from time import sleep
|
||||||
|
from dublinbus_soap_client import DublinBusSoapClient
|
||||||
|
import queue
|
||||||
|
from arrival_times import ArrivalTime
|
||||||
|
|
||||||
|
# Constants
|
||||||
|
# The font is JD LCD Rounded by Jecko Development
|
||||||
|
# https://fontstruct.com/fontstructions/show/459792/jd_lcd_rounded
|
||||||
|
TEXT_FONT = 'jd_lcd_rounded.ttf'
|
||||||
|
LINE_COUNT = 5
|
||||||
|
COLOR_LCD : pygame.Color = pygame.Color(244, 203, 96)
|
||||||
|
COLOR_BACKGROUND = pygame.Color(0, 0, 0)
|
||||||
|
UPDATE_INTERVAL_SECONDS = 30
|
||||||
|
TEXT_SIZE = 80 # Size of the font in pixels
|
||||||
|
STOPS = [
|
||||||
|
2410, # College Drive
|
||||||
|
1114 # Priory Walk
|
||||||
|
]
|
||||||
|
|
||||||
|
# Offsets of each part within a line
|
||||||
|
XOFFSET_ROUTE = 12 # 1920x720 -> 24
|
||||||
|
XOFFSET_DESTINATION = 150 # 1920x720 -> 300
|
||||||
|
XOFFSEET_TIME_LEFT = 803 # 1920x720 -> 1606
|
||||||
|
INTER_LINE_SPACE = 0 # 1920x720 -> 0
|
||||||
|
|
||||||
|
# Some global variables
|
||||||
|
window : pygame.Surface = None
|
||||||
|
font: pygame.font.Font = None
|
||||||
|
update_queue = queue.Queue(maxsize=10)
|
||||||
|
dublinbus_client = DublinBusSoapClient(stops=STOPS, update_queue=update_queue, update_interval_seconds=UPDATE_INTERVAL_SECONDS)
|
||||||
|
|
||||||
|
|
||||||
|
def get_line_offset(line: int) -> int:
|
||||||
|
global font
|
||||||
|
return line * (font.get_height() + INTER_LINE_SPACE)
|
||||||
|
|
||||||
|
def write_entry(line: int,
|
||||||
|
route: str = '', destination: str = '', time_left: str = '',
|
||||||
|
time_color: Color = COLOR_LCD, text_color: Color = COLOR_LCD):
|
||||||
|
# Step 1: Render the fragments
|
||||||
|
route_img = font.render(route[0:4], True, text_color)
|
||||||
|
destination_img = font.render(destination[0:21], True, text_color)
|
||||||
|
time_left_img = font.render(time_left[0:5], True, time_color)
|
||||||
|
|
||||||
|
# Compose the line
|
||||||
|
vertical_offset = get_line_offset(line)
|
||||||
|
window.blit(route_img, dest=(XOFFSET_ROUTE, vertical_offset))
|
||||||
|
window.blit(destination_img, dest=(XOFFSET_DESTINATION, vertical_offset))
|
||||||
|
window.blit(time_left_img, dest=(XOFFSEET_TIME_LEFT, vertical_offset))
|
||||||
|
|
||||||
|
def update_screen(updates: list[ArrivalTime]) -> None:
|
||||||
|
""" Repaint the screen with the new arrival times """
|
||||||
|
updates = updates[0:LINE_COUNT] # take the first X lines
|
||||||
|
for line_num, update in enumerate(updates):
|
||||||
|
write_entry(
|
||||||
|
line=line_num,
|
||||||
|
route=update.route_id,
|
||||||
|
destination=update.destination,
|
||||||
|
time_left='Due' if update.isDue() else f'{update.due_in_minutes}min',
|
||||||
|
time_color=COLOR_LCD
|
||||||
|
)
|
||||||
|
|
||||||
|
def clear_screen() -> None:
|
||||||
|
pygame.draw.rect(surface=window, color=COLOR_BACKGROUND, width=0, rect=(0, 0, window.get_width(), window.get_height()))
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
global font
|
||||||
|
global window
|
||||||
|
|
||||||
|
""" Main method. Initialise graph """
|
||||||
|
pygame.init()
|
||||||
|
pygame.display.init()
|
||||||
|
pygame.font.init()
|
||||||
|
window = pygame.display.set_mode(size=(960, 360), flags=DOUBLEBUF, display=1)
|
||||||
|
font = pygame.font.Font(TEXT_FONT, TEXT_SIZE)
|
||||||
|
font.set_bold(True)
|
||||||
|
|
||||||
|
# Paint black
|
||||||
|
clear_screen()
|
||||||
|
pygame.display.flip()
|
||||||
|
dublinbus_client.start()
|
||||||
|
|
||||||
|
# Main event loop
|
||||||
|
running = True
|
||||||
|
while running:
|
||||||
|
pygame.event.wait(timeout=1)
|
||||||
|
for e in pygame.event.get():
|
||||||
|
if e.type == pygame.QUIT:
|
||||||
|
running = False
|
||||||
|
#end event handling
|
||||||
|
if update_queue.not_empty:
|
||||||
|
clear_screen()
|
||||||
|
updates = update_queue.get()
|
||||||
|
update_screen(updates)
|
||||||
|
pygame.display.flip()
|
||||||
|
|
||||||
|
pygame.quit()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
|
|
@ -0,0 +1,3 @@
|
||||||
|
iso8601==1.0.2
|
||||||
|
pygame==2.1.2
|
||||||
|
zeep==4.1.0
|
||||||
Loading…
Reference in New Issue