From 5ef8c71841a47020687c0c61d2b43973a33d743e Mon Sep 17 00:00:00 2001 From: Jonathan Diamond Date: Mon, 8 Apr 2024 10:33:58 -0700 Subject: [PATCH] Add script to automate running analysis on logs taken during a drive test. --- .../analysis/pose_compare.py | 19 +-- .../applications/p1_drive_analysis.py | 153 ++++++++++++++++++ python/setup.py | 6 +- 3 files changed, 168 insertions(+), 10 deletions(-) create mode 100755 python/fusion_engine_client/applications/p1_drive_analysis.py diff --git a/python/fusion_engine_client/analysis/pose_compare.py b/python/fusion_engine_client/analysis/pose_compare.py index c3dc8bf1..d8af45c6 100755 --- a/python/fusion_engine_client/analysis/pose_compare.py +++ b/python/fusion_engine_client/analysis/pose_compare.py @@ -169,17 +169,18 @@ def __init__(self, self.output_dir = output_dir self.prefix = prefix - if time_axis in ('relative', 'rel'): - self.time_axis = 'relative' - gps_time_test = self.test_pose.gps_time - valid_gps_time = gps_time_test[np.isfinite(gps_time_test)] + gps_time_test = self.test_pose.gps_time + valid_gps_time = gps_time_test[np.isfinite(gps_time_test)] + if len(valid_gps_time) == 0: + raise ValueError('Test data had no valid GPS Times.') - if len(valid_gps_time) > 0: - self.t0 = valid_gps_time[0] - else: - self.t0 = Timestamp() + if np.all(self.test_pose.solution_type == SolutionType.Invalid): + raise ValueError(f'Test data had no valid position solutions.') + if time_axis in ('relative', 'rel'): + self.time_axis = 'relative' + self.t0 = valid_gps_time[0] self.gps_time_label = 'Relative Time (sec)' elif time_axis in ('absolute', 'abs'): self.time_axis = 'absolute' @@ -200,7 +201,7 @@ def __init__(self, self.pose_index_maps = self._get_log_pose_mapping() - if len(self.pose_index_maps) == 0: + if len(self.pose_index_maps[0]) == 0: raise ValueError('Test and reference logs did not have overlapping GPS times.') def _get_log_pose_mapping(self): diff --git a/python/fusion_engine_client/applications/p1_drive_analysis.py b/python/fusion_engine_client/applications/p1_drive_analysis.py new file mode 100755 index 00000000..21445b1b --- /dev/null +++ b/python/fusion_engine_client/applications/p1_drive_analysis.py @@ -0,0 +1,153 @@ +#!/usr/bin/env python3 + +import io +import sys +import os +import json +from pathlib import Path +from typing import Dict + +import boto3 + +if __package__ is None or __package__ == "": + from import_utils import enable_relative_imports + __package__ = enable_relative_imports(__name__, __file__) + +from ..analysis.pose_compare import main as pose_compare_main +from ..utils import trace as logging +from ..utils.argument_parser import ArgumentParser + +S3_DEFAULT_INGEST_BUCKET = 'pointone-ingest-landingpad' +S3_DEFAULT_REGION = 'us-west-1' +META_FILE = "drive_test_metadata.json" +LOG_DIR = Path('/logs/drive_analysis') +MANIFEST_FILE = 'maniphest.json' +TEST_LOG_FILE = 'output/diagnostics.p1log' +REFERENCE_LOG_FILE = 'fusion_engine.p1log' + + +_logger = logging.getLogger('point_one.fusion_engine.applications.drive_analysis') + +s3_client = boto3.client('s3', region_name=S3_DEFAULT_REGION) + + +def download_to_memory(s3_key) -> bytes: + file_stream = io.BytesIO() + s3_client.download_fileobj(S3_DEFAULT_INGEST_BUCKET, s3_key, file_stream) + file_stream.seek(0) + return file_stream.read() + + +def find_logs(prefix, log_guids) -> Dict[str, str]: + resp = {} + paginator = s3_client.get_paginator('list_objects_v2') + pages = paginator.paginate(Bucket=S3_DEFAULT_INGEST_BUCKET, Prefix=prefix) + for page in pages: + for content in page['Contents']: + for uuid in log_guids: + if uuid in content['Key']: + offset = content['Key'].index(uuid) + resp[uuid] = content['Key'][:offset] + uuid + break + return resp + + +def main(): + parser = ArgumentParser(description="""\ + Run p1_pose_compare for each device included in a drive test. + This tool downloads the relevant files from S3 and prompts stdin before moving on to the next log. + """) + parser.add_argument('-v', '--verbose', action='count', default=0, + help="Print verbose/trace debugging messages.") + + parser.add_argument( + 'key_for_log_in_drive', + help="The full S3 key for one of the logs in the drive.\n" + "Ex. '2024-04-04/p1-lexus-rack-2/a0a0ff472ea342809d05380d8fe54399'") + + options = parser.parse_args() + + # Configure logging. + if options.verbose >= 1: + logging.basicConfig(level=logging.INFO, format='%(levelname)s - %(name)s:%(lineno)d - %(message)s', + stream=sys.stdout) + if options.verbose == 1: + logging.getLogger('point_one.fusion_engine.analysis.pose_compare').setLevel(logging.DEBUG) + else: + logging.getLogger('point_one.fusion_engine.analysis.pose_compare').setLevel( + logging.getTraceLevel(depth=options.verbose - 1)) + else: + logging.basicConfig(level=logging.INFO, format='%(message)s', stream=sys.stdout) + + key_split = options.key_for_log_in_drive.split('/') + + if len(key_split) < 3: + _logger.error( + f'Key had unexpected prefix. Expecting S3 key like "2024-04-04/p1-lexus-rack-2/a0a0ff472ea342809d05380d8fe54399".') + exit(1) + elif len(key_split) > 3: + options.key_for_log_in_drive = '/'.join(key_split[:3]) + _logger.warning( + f'Key had unexpected prefix. Expecting S3 key like "2024-04-04/p1-lexus-rack-2/a0a0ff472ea342809d05380d8fe54399". Only using "{options.key_for_log_in_drive}".') + + prefix = options.key_for_log_in_drive.split('/')[0] + + os.makedirs(LOG_DIR, exist_ok=True) + + try: + meta_key = options.key_for_log_in_drive + '/' + META_FILE + drive_meta_data = download_to_memory(options.key_for_log_in_drive + '/' + META_FILE) + except: + _logger.error(f'Could not find "S3://{S3_DEFAULT_INGEST_BUCKET}/{meta_key}". Make sure this log was taken as part of a drive test collection.') + exit(1) + + drive_meta = json.loads(drive_meta_data.decode('utf-8')) + + reference_guid = drive_meta['drive_reference_log'] + + _logger.info(f'Using reference log: {reference_guid}') + + test_guids = drive_meta['drive_logs'] + + logs_to_download = [] + log_paths = {} + for guid in [reference_guid] + test_guids: + log_paths[guid] = LOG_DIR / f'{guid}.p1log' + if not log_paths[guid].exists(): + logs_to_download.append(guid) + else: + _logger.info(f'Using cached: {log_paths[guid]}') + + if len(logs_to_download) > 0: + log_prefixes = find_logs(prefix, logs_to_download) + for guid in logs_to_download: + if guid not in log_prefixes: + if reference_guid == guid: + _logger.error( + f"Could't find test log: {guid}. Make sure collection wasn't on day boundary. Continuing without it.") + test_guids.remove(guid) + else: + _logger.error(f"Could't find reference log: {guid}. Make sure collection wasn't on day boundary.") + exit(1) + + for guid, s3_prefix in log_prefixes.items(): + _logger.info(f'Downloading: {log_paths[guid]}') + if reference_guid == guid: + reference_p1log_key = s3_prefix + '/' + REFERENCE_LOG_FILE + else: + reference_p1log_key = s3_prefix + '/' + TEST_LOG_FILE + + s3_client.download_file(S3_DEFAULT_INGEST_BUCKET, reference_p1log_key, log_paths[guid]) + + for guid in test_guids: + _logger.info(f'Comparing log: {guid}') + sys.argv = ['pose_compare_main', str(log_paths[guid]), str(log_paths[reference_guid]), '--time-axis=rel'] + try: + pose_compare_main() + except Exception as e: + _logger.error(f'Failure: {e}') + input("Press Enter To Process Next Log") + + +if __name__ == '__main__': + main() diff --git a/python/setup.py b/python/setup.py index a4ee264c..ab723dfd 100644 --- a/python/setup.py +++ b/python/setup.py @@ -42,7 +42,11 @@ def find_version(*file_paths): 'packaging>=21.0.0', ]) -all_requirements = message_requirements | tools_requirements | display_requirements | dev_requirements +internal_only_requirements = set([ + 'boto3>=1.34.79', +]) + +all_requirements = message_requirements | tools_requirements | display_requirements | dev_requirements | internal_only_requirements setup( name='fusion-engine-client',