Final set of changes for realtime support.
This commit is contained in:
parent
8496cfd72f
commit
a2ccb9b761
|
|
@ -5,9 +5,9 @@ mod refresher;
|
||||||
mod realtime;
|
mod realtime;
|
||||||
pub mod structs;
|
pub mod structs;
|
||||||
use chrono::{DateTime, Local, Timelike};
|
use chrono::{DateTime, Local, Timelike};
|
||||||
use log::{debug, trace};
|
use log::{debug, trace, warn};
|
||||||
use std::{collections::{HashMap, HashSet}, fs::File };
|
use std::{collections::{HashMap, HashSet}, fs::File };
|
||||||
use gtfs_structures::{Exception, RawTrip};
|
use gtfs_structures::{ContinuousPickupDropOff::ArrangeByPhone, Exception, RawTrip};
|
||||||
use crate::gtfs::{loader::load_gtfs, structs::{Arrival, Gtfs, Preferences, Error}};
|
use crate::gtfs::{loader::load_gtfs, structs::{Arrival, Gtfs, Preferences, Error}};
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -82,9 +82,18 @@ impl Gtfs<'_> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
arrivals.sort();
|
arrivals.sort();
|
||||||
|
|
||||||
debug!("Found {} arrivals", arrivals.len());
|
debug!("Found {} arrivals", arrivals.len());
|
||||||
|
|
||||||
|
// Update real-time deltas
|
||||||
|
|
||||||
|
let realtime_result = self.realtime_update(&mut arrivals);
|
||||||
|
if realtime_result.is_err() {
|
||||||
|
warn!("Unable to update realtime arrivals: {:#?}", realtime_result.err().unwrap()._message)
|
||||||
|
}
|
||||||
|
|
||||||
return Some(arrivals);
|
return Some(arrivals);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,14 +1,14 @@
|
||||||
|
use log::debug;
|
||||||
/***
|
/***
|
||||||
* Implementation of GTFS-R polling
|
* Implementation of GTFS-R polling
|
||||||
*/
|
*/
|
||||||
use reqwest::{blocking::Client, header::{HeaderName, HeaderValue}};
|
use reqwest::{StatusCode, blocking::Client};
|
||||||
use std::{collections::HashMap, str::FromStr, time::Duration};
|
use std::{collections::HashMap, time::Duration};
|
||||||
use serde_json::{Value, de::Read};
|
|
||||||
|
|
||||||
use crate::gtfs::{self, structs::{Arrival, Error, Gtfs}};
|
use crate::gtfs::{self, structs::{Arrival, Error, Gtfs}};
|
||||||
|
|
||||||
impl Gtfs<'_> {
|
impl Gtfs<'_> {
|
||||||
fn realtime_update (&self, arrivals: Vec<Arrival<'_>>) -> Result<(), gtfs::structs::Error>{
|
pub(crate) fn realtime_update (&self, arrivals: &mut Vec<Arrival<'_>>) -> Result<(), gtfs::structs::Error>{
|
||||||
|
|
||||||
// Poll GTFS-R API
|
// Poll GTFS-R API
|
||||||
let client = Client::builder()
|
let client = Client::builder()
|
||||||
|
|
@ -21,15 +21,23 @@ impl Gtfs<'_> {
|
||||||
.header("x-api-key", &self.preferences.realtime_api_key)
|
.header("x-api-key", &self.preferences.realtime_api_key)
|
||||||
.send()?;
|
.send()?;
|
||||||
|
|
||||||
|
if response.status() != StatusCode::OK {
|
||||||
|
return Err(Error {
|
||||||
|
_message : format!("HTTP Respomse: {:#?}. Payload: \n{:#?}\n", response.status(), response.text().unwrap())
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
// Parse response
|
// Parse response
|
||||||
let realtime_info: gtfs_realtime::FeedMessage
|
let realtime_info: gtfs_realtime::FeedMessage
|
||||||
= serde_json::from_reader(response)?;
|
= serde_json::from_reader(response)?;
|
||||||
|
|
||||||
|
|
||||||
// Build a map of (trip, stop) -> Arrival for faster lookup
|
// Build a map of (trip, stop) -> Arrival for faster lookup
|
||||||
let mut lookup: HashMap<(&String, &String), mut Arrival<'_>> = HashMap::new();
|
let mut lookup: HashMap<(String, String), usize> = HashMap::new();
|
||||||
for arrival in arrivals {
|
let num_arrivals = arrivals.len();
|
||||||
lookup.insert((&(arrival.trip.id), &(arrival.stop.id)), arrival);
|
for i in 0..num_arrivals {
|
||||||
|
let arrival = arrivals.get(i).unwrap();
|
||||||
|
lookup.insert((arrival.trip.id.clone(), arrival.stop.id.clone()), i);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Match deltas to existing arrivals
|
// Match deltas to existing arrivals
|
||||||
|
|
@ -42,16 +50,18 @@ impl Gtfs<'_> {
|
||||||
if stop_update.stop_id.is_none() {
|
if stop_update.stop_id.is_none() {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
let trip_id = trip_update.trip.trip_id.as_ref().unwrap();
|
let trip_id = trip_update.trip.trip_id.clone().unwrap();
|
||||||
let key = (trip_id, &(stop_update.stop_id.unwrap()));
|
let stop_id = stop_update.stop_id.unwrap();
|
||||||
|
let key = (trip_id, stop_id);
|
||||||
if lookup.contains_key(&key) {
|
if lookup.contains_key(&key) {
|
||||||
|
|
||||||
// Update the arrival time
|
// Update the arrival time
|
||||||
let arrival_value = lookup.get(&key);
|
let arrival_index = lookup.get(&key);
|
||||||
let arrival: &&mut Arrival<'_> = arrival_value.unwrap();
|
let arrival: &mut Arrival<'_> = arrivals.get_mut(*arrival_index.unwrap()).unwrap();
|
||||||
if let Some(update) = stop_update.departure.or(stop_update.arrival) {
|
if let Some(update) = stop_update.departure.or(stop_update.arrival) {
|
||||||
let new_time = (((arrival.departure_time as i64)
|
let new_time = (((arrival.departure_time as i64)
|
||||||
+ (update.delay.unwrap_or(0) as i64))) as u32;
|
+ (update.delay.unwrap_or(0) as i64))) as u32;
|
||||||
|
debug!("Updated arrival time of {:#?} by {}s", &key, update.delay.unwrap());
|
||||||
arrival.departure_time = new_time;
|
arrival.departure_time = new_time;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -2,7 +2,6 @@ use std::collections::{HashMap, HashSet};
|
||||||
use gtfs_structures::{Agency, Calendar, CalendarDate, RawStopTime, RawTrip, Route, Stop};
|
use gtfs_structures::{Agency, Calendar, CalendarDate, RawStopTime, RawTrip, Route, Stop};
|
||||||
use log::error;
|
use log::error;
|
||||||
use serde::{Serialize, Deserialize};
|
use serde::{Serialize, Deserialize};
|
||||||
use serde_json::value;
|
|
||||||
use zip::result::ZipError;
|
use zip::result::ZipError;
|
||||||
|
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue