diff --git a/src/ssoss/dynamic_road_object.py b/src/ssoss/dynamic_road_object.py index 3f02e24..0e08238 100644 --- a/src/ssoss/dynamic_road_object.py +++ b/src/ssoss/dynamic_road_object.py @@ -374,84 +374,81 @@ def drive_gpx(self, gpx_filename: str, use_pickle_file=False) -> pd.DataFrame: def drive_gpx_stop_bar(self, gpx_filename, use_pickle_file=True) -> pd.DataFrame: - """ Load the GPX file into a dataframe with timestamp, location, speed, dist to event, bearing, and - id of approaching intersection - - :param gpx_directory: file directory where .gpx file is for input - :param gpx_filename: filename of .gpx file (without .gpx) - :param out_file_directory: where output files are saved to (./out/) - :param use_pickle_file: default to False, can be faster to load from pickle - :return: DataFrame with GPX calculated for sro file and saved a CSV and Pickle file of gpx information + """Process GPX points and generate a summary dataframe. + :param gpx_filename: filename of .gpx file (without extension) + :param use_pickle_file: read cached pickle if available + :return: DataFrame with processed GPX information """ - # self.gpx_filepath = gpx_directory + gpx_filename + ".gpx" pickle_file = self.out_file_path / (str(gpx_filename) + ".p") csv_file = self.out_file_path / (str(gpx_filename) + ".csv") - approach_sb_log_df = None - if use_pickle_file and os.path.isfile(pickle_file): - approach_sb_log_df = pd.read_pickle(pickle_file) - return approach_sb_log_df - else: - # dictionary-> Keys:Values - appr_dict = { - "id": [], - "appr_dir": [], - "timestamp": [], - "time_delta": [], - "location": [], - "spd": [], - "distance": [], - "bearing": [], - "approaching": [] - } + return pd.read_pickle(pickle_file) - for i in tqdm(range(2, self.gpx_df.last_valid_index()), - desc="Loading GPX:", - unit="GPX Points"): + appr_dict = self.parse_gpx_points() + return self.write_summary(appr_dict, csv_file, pickle_file) - self.update_location_simple(i) - cai = self.get_closest_approaching_intersection() - if cai is None: - if i == self.gpx_df.last_valid_index() - 1: - approach_sb_log_df = pd.DataFrame(appr_dict) - approach_sb_log_df.to_csv(csv_file) - approach_sb_log_df.to_pickle(pickle_file) - print( - f"exported dataframe to CSV (in {csv_file}) and Pickle (in {pickle_file})" - ) - else: - pass - else: - approaching_sd = False - appr_distance = (cai.distance_from_sb( - self.get_location(), self.approach_leg(cai)) - - cai.get_sd(self.approach_leg(cai))) - # print(f'{i}: {appr_distance}ft from {cai.get_name()}') - if appr_distance > 0: - approaching_sd = True + def parse_gpx_points(self) -> dict: + """Iterate through GPX dataframe and collect approach information.""" - appr_dict["id"].append(cai.get_id_num()) - appr_dict["appr_dir"].append(self.approach_leg(cai)) - appr_dict["timestamp"].append(self.get_utc_timestamp()) - appr_dict["time_delta"].append(self.get_time_step()) - appr_dict["location"].append(self.get_location()) - appr_dict["spd"].append(self.get_spd()) - appr_dict["distance"].append(appr_distance) - appr_dict["bearing"].append(self.get_bearing()) - appr_dict["approaching"].append(approaching_sd) + appr_dict = { + "id": [], + "appr_dir": [], + "timestamp": [], + "time_delta": [], + "location": [], + "spd": [], + "distance": [], + "bearing": [], + "approaching": [] + } - if i == self.gpx_df.last_valid_index() - 1: - print("WRITING DICT TO DATAFRAME") - approach_sb_log_df = pd.DataFrame(appr_dict) - approach_sb_log_df.to_csv(csv_file) - approach_sb_log_df.to_pickle(pickle_file) - print( - f"Exported data frame to CSV ({csv_file}) and Pickle ({pickle_file})" - ) - print(f'ApproachSB_DF:{approach_sb_log_df}') + last_idx = self.gpx_df.last_valid_index() + for i in tqdm(range(2, last_idx), desc="Loading GPX:", unit="GPX Points"): + self.update_location_simple(i) + cai = self.get_closest_approaching_intersection() + if not cai: + continue + + appr_distance = ( + cai.distance_from_sb(self.get_location(), self.approach_leg(cai)) - + cai.get_sd(self.approach_leg(cai)) + ) + approaching_sd = appr_distance > 0 + self.update_approach_dict(appr_dict, cai, appr_distance, approaching_sd) + + return appr_dict + + def update_approach_dict( + self, + appr_dict: dict, + cai: Intersection, + appr_distance: float, + approaching_sd: bool, + ) -> None: + """Append approach information for a single GPX point to ``appr_dict``.""" + + appr_dict["id"].append(cai.get_id_num()) + appr_dict["appr_dir"].append(self.approach_leg(cai)) + appr_dict["timestamp"].append(self.get_utc_timestamp()) + appr_dict["time_delta"].append(self.get_time_step()) + appr_dict["location"].append(self.get_location()) + appr_dict["spd"].append(self.get_spd()) + appr_dict["distance"].append(appr_distance) + appr_dict["bearing"].append(self.get_bearing()) + appr_dict["approaching"].append(approaching_sd) + @staticmethod + def write_summary(appr_dict: dict, csv_file: PurePath, pickle_file: PurePath) -> pd.DataFrame: + """Convert dict to DataFrame and write to disk.""" + + approach_sb_log_df = pd.DataFrame(appr_dict) + approach_sb_log_df.to_csv(csv_file) + approach_sb_log_df.to_pickle(pickle_file) + print( + f"Exported data frame to CSV ({csv_file}) and Pickle ({pickle_file})" + ) return approach_sb_log_df def get_street(self, itrsxn: Intersection) -> str: diff --git a/src/ssoss/process_video.py b/src/ssoss/process_video.py index ece7286..6dc3f4b 100644 --- a/src/ssoss/process_video.py +++ b/src/ssoss/process_video.py @@ -166,47 +166,54 @@ def extract_sightings(self, desc_timestamps, project, label_img=True, gen_gif=Fa image_path.mkdir(exist_ok=True, parents=True) capture = cv2.VideoCapture(str(self.video_filepath)) - frame_count = self.get_frame_count() + self.extract_frames_to_images(capture, extract_frames, intersection_desc, image_path) - i = 0 # index for all frames to extract - j = 0 # index for frames list to extract as image - k = 0 # intersection string description counter + if label_img: + self.img_overlay_info_box(self.video_filename, project) + if gen_gif: + self.generate_gif(desc_timestamps, project) - while capture.isOpened() and len(extract_frames) > 0 and i < frame_count: + def extract_frames_to_images(self, capture, extract_frames, intersection_desc, image_path): + """Loop through frames and save requested images.""" - for current_frame in tqdm(range(0, extract_frames[-1]), - desc="Frame Search", - unit=" Frames"): - ret, frame = capture.read() - if ret is False: - print("ERROR: ret is FALSE on OpenCV image") - break - if i == extract_frames[j] and j <= len(extract_frames)-1: - frame_name = str(intersection_desc[j]) + '.jpg' - frame_filepath = image_path / frame_name - cv2.imwrite(str(frame_filepath), frame) - print( - f'PICTURE CAPTURED AT {extract_frames[j]}: {intersection_desc[j]}, Saved {j + 1} picture(s) of {len(extract_frames)}') - j += 1 - k += 1 + if not extract_frames: + capture.release() + return + + j = 0 + last_frame = extract_frames[-1] + for i in tqdm(range(0, last_frame + 1), desc="Frame Search", unit=" frame"): + if not capture.isOpened() or i >= self.get_frame_count(): + break + ret, frame = capture.read() + if not ret: + print("ERROR: ret is FALSE on OpenCV image") + break + if i == extract_frames[j]: + self.save_single_frame(frame, intersection_desc[j], image_path, i, j, len(extract_frames)) + j += 1 if j == len(extract_frames): print("done processing images") - capture.release() break - i += 1 - if i > extract_frames[-1]: - break capture.release() - if label_img: - self.img_overlay_info_box(self.video_filename, project) - if gen_gif: - self.generate_gif(desc_timestamps, project) - """ - if bbox: - self.img_overlay_bbox(description_list,project) - - """ + def save_single_frame( + self, + frame, + description: str, + image_path: Path, + frame_idx: int, + count: int, + total: int, + ) -> None: + """Save a single frame to ``image_path`` and log progress.""" + + frame_name = f"{description}.jpg" + frame_filepath = image_path / frame_name + cv2.imwrite(str(frame_filepath), frame) + print( + f"PICTURE CAPTURED AT {frame_idx}: {description}, Saved {count + 1} picture(s) of {total}" + ) # TODO: convert to start_sec, start_min=0, end_sec, end_min=0, folder="")