Force garbage collection in multiple places during data load to avoid OOMs in the Pi Zero

This commit is contained in:
Nahuel Lofeudo 2023-04-23 07:43:45 +01:00
parent 4a7a09fce7
commit cbad15a6fd
1 changed files with 13 additions and 3 deletions

View File

@ -1,18 +1,16 @@
from arrival_times import ArrivalTime from arrival_times import ArrivalTime
import datetime import datetime
import gc
import gtfs_kit as gk import gtfs_kit as gk
import json import json
import os import os
import pandas as pd import pandas as pd
import queue import queue
import re
import refresh_feed import refresh_feed
import requests import requests
import tempfile
import time import time
import threading import threading
import traceback import traceback
import shutil
import zipfile import zipfile
class GTFSClient(): class GTFSClient():
@ -37,6 +35,7 @@ class GTFSClient():
# Load the feed # Load the feed
self.feed = self._read_feed(feed_name, dist_units='km', stop_codes = stop_codes) self.feed = self._read_feed(feed_name, dist_units='km', stop_codes = stop_codes)
gc.collect()
self.stop_ids = self.__wanted_stop_ids() self.stop_ids = self.__wanted_stop_ids()
self.deltas = {} self.deltas = {}
self.canceled_trips = set() self.canceled_trips = set()
@ -71,6 +70,8 @@ class GTFSClient():
if not path.exists(): if not path.exists():
raise ValueError(f"Path {path} does not exist") raise ValueError(f"Path {path} does not exist")
gc.collect()
feed_dict = {table: None for table in gk.cs.GTFS_REF["table"]} feed_dict = {table: None for table in gk.cs.GTFS_REF["table"]}
with zipfile.ZipFile(path) as z: with zipfile.ZipFile(path) as z:
for filename in FILES_TO_LOAD: for filename in FILES_TO_LOAD:
@ -81,6 +82,8 @@ class GTFSClient():
if not df.empty: if not df.empty:
feed_dict[table] = gk.cn.clean_column_names(df) feed_dict[table] = gk.cn.clean_column_names(df)
gc.collect()
# Finally, load stop_times.txt # Finally, load stop_times.txt
# Obtain the list of IDs of the desired stops. This is similar to what __wanted_stop_ids() does, # Obtain the list of IDs of the desired stops. This is similar to what __wanted_stop_ids() does,
# but without a dependency on a fully formed feed object # but without a dependency on a fully formed feed object
@ -89,11 +92,16 @@ class GTFSClient():
iter_csv = pd.read_csv(f, iterator=True, chunksize=1000, dtype=gk.cs.DTYPE, encoding="utf-8-sig") iter_csv = pd.read_csv(f, iterator=True, chunksize=1000, dtype=gk.cs.DTYPE, encoding="utf-8-sig")
df = pd.concat([chunk[chunk["stop_id"].isin(wanted_stop_ids)] for chunk in iter_csv]) df = pd.concat([chunk[chunk["stop_id"].isin(wanted_stop_ids)] for chunk in iter_csv])
gc.collect()
if not df.empty: if not df.empty:
# Fix arrival and departure times so that comparisons work the way they are expected to # Fix arrival and departure times so that comparisons work the way they are expected to
df["arrival_time"] = df.apply(lambda row: row["arrival_time"] if len(row["arrival_time"]) == 8 else "0"+row["arrival_time"], axis=1) df["arrival_time"] = df.apply(lambda row: row["arrival_time"] if len(row["arrival_time"]) == 8 else "0"+row["arrival_time"], axis=1)
gc.collect()
df["departure_time"] = df.apply(lambda row: row["departure_time"] if len(row["departure_time"]) == 8 else "0"+row["departure_time"], axis=1) df["departure_time"] = df.apply(lambda row: row["departure_time"] if len(row["departure_time"]) == 8 else "0"+row["departure_time"], axis=1)
gc.collect()
feed_dict["stop_times"] = gk.cn.clean_column_names(df) feed_dict["stop_times"] = gk.cn.clean_column_names(df)
gc.collect()
feed_dict["dist_units"] = dist_units feed_dict["dist_units"] = dist_units
@ -296,6 +304,8 @@ class GTFSClient():
if self._update_queue: if self._update_queue:
self._update_queue.put(arrivals) self._update_queue.put(arrivals)
gc.collect()
return arrivals return arrivals