Compare commits

..

2 Commits

4 changed files with 73 additions and 15 deletions

View File

@ -8,10 +8,12 @@ host = "x86_64-unknown-linux-gnu"
chrono = "0.4"
csv = "1.4"
gtfs-structures = "0.47"
gtfs-realtime = "0.2"
log = "0.4"
reqwest = { version = "0.11", features = ["blocking"] }
sdl3 = {version = "0.17", features = ["ttf"]}
serde = { version = "1.0", features = ["derive"]}
serde_json = "1.0"
time-format = "1.2"
yaml_serde = "0.10"
zip = "8.3"

View File

@ -5,9 +5,9 @@ mod refresher;
mod realtime;
pub mod structs;
use chrono::{DateTime, Local, Timelike};
use log::{debug, trace};
use log::{debug, trace, warn};
use std::{collections::{HashMap, HashSet}, fs::File };
use gtfs_structures::{Exception, RawTrip};
use gtfs_structures::{ContinuousPickupDropOff::ArrangeByPhone, Exception, RawTrip};
use crate::gtfs::{loader::load_gtfs, structs::{Arrival, Gtfs, Preferences, Error}};
@ -82,9 +82,18 @@ impl Gtfs<'_> {
}
}
}
arrivals.sort();
debug!("Found {} arrivals", arrivals.len());
// Update real-time deltas
let realtime_result = self.realtime_update(&mut arrivals);
if realtime_result.is_err() {
warn!("Unable to update realtime arrivals: {:#?}", realtime_result.err().unwrap()._message)
}
return Some(arrivals);
}

View File

@ -1,15 +1,14 @@
use log::debug;
/***
* Implementation of GTFS-R polling
*/
use reqwest::{StatusCode, blocking::Client};
use std::{collections::HashMap, time::Duration};
use std::{str::FromStr, time::Duration};
use reqwest::{blocking::Client, header::{HeaderName, HeaderValue}};
use crate::gtfs::structs::{Arrival, Error, Gtfs};
use crate::gtfs::{self, structs::{Arrival, Error, Gtfs}};
impl Gtfs<'_> {
fn realtime_update (&self, arrivals: Vec<Arrival<'_>>) -> Result<bool, Error>{
pub(crate) fn realtime_update (&self, arrivals: &mut Vec<Arrival<'_>>) -> Result<(), gtfs::structs::Error>{
// Poll GTFS-R API
let client = Client::builder()
@ -17,20 +16,62 @@ impl Gtfs<'_> {
.connect_timeout(Duration::from_secs(5))
.build()?;
let mut response = client
.get(self.preferences.realtime_url)
.header("x-api-key", self.preferences.realtime_api_key)
let response = client
.get(&self.preferences.realtime_url)
.header("x-api-key", &self.preferences.realtime_api_key)
.send()?;
if response.status() != StatusCode::OK {
return Err(Error {
_message : format!("HTTP Respomse: {:#?}. Payload: \n{:#?}\n", response.status(), response.text().unwrap())
})
}
// Parse response
let response = json::rdr
let realtime_info: gtfs_realtime::FeedMessage
= serde_json::from_reader(response)?;
// Build a map of (trip, stop) -> Arrival for faster lookup
let mut lookup: HashMap<(String, String), usize> = HashMap::new();
let num_arrivals = arrivals.len();
for i in 0..num_arrivals {
let arrival = arrivals.get(i).unwrap();
lookup.insert((arrival.trip.id.clone(), arrival.stop.id.clone()), i);
}
// Match deltas to existing arrivals
for entity in realtime_info.entity {
if entity.trip_update.is_some() && entity.stop.is_some() {
let trip_update = entity.trip_update.unwrap();
// Update the arrival times
// Look up the entry in arrivals for the current
for stop_update in trip_update.stop_time_update {
if stop_update.stop_id.is_none() {
continue;
}
let trip_id = trip_update.trip.trip_id.clone().unwrap();
let stop_id = stop_update.stop_id.unwrap();
let key = (trip_id, stop_id);
if lookup.contains_key(&key) {
// Update the arrival time
let arrival_index = lookup.get(&key);
let arrival: &mut Arrival<'_> = arrivals.get_mut(*arrival_index.unwrap()).unwrap();
if let Some(update) = stop_update.departure.or(stop_update.arrival) {
let new_time = (((arrival.departure_time as i64)
+ (update.delay.unwrap_or(0) as i64))) as u32;
debug!("Updated arrival time of {:#?} by {}s", &key, update.delay.unwrap());
arrival.departure_time = new_time;
}
}
}
}
}
return Ok(());
}
}
}

View File

@ -28,6 +28,12 @@ impl From<ZipError> for Error {
}
}
impl From<serde_json::Error> for Error {
fn from(value: serde_json::Error) -> Self {
return Error { _message: value.to_string() }
}
}
// This is to store the preferences for the GTFS(-R) side of the code.
#[derive(Debug, PartialEq, Serialize, Deserialize)]