Skip to content

Commit

Permalink
Add script to automate running analysis on logs taken during a drive …
Browse files Browse the repository at this point in the history
…test.
  • Loading branch information
Jonathan Diamond committed Apr 16, 2024
1 parent 8219ba9 commit 40ace77
Show file tree
Hide file tree
Showing 3 changed files with 168 additions and 10 deletions.
19 changes: 10 additions & 9 deletions python/fusion_engine_client/analysis/pose_compare.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,17 +169,18 @@ def __init__(self,
self.output_dir = output_dir
self.prefix = prefix

if time_axis in ('relative', 'rel'):
self.time_axis = 'relative'

gps_time_test = self.test_pose.gps_time
valid_gps_time = gps_time_test[np.isfinite(gps_time_test)]
gps_time_test = self.test_pose.gps_time
valid_gps_time = gps_time_test[np.isfinite(gps_time_test)]
if len(valid_gps_time) == 0:
raise ValueError('Test data had no valid GPS Times.')

if len(valid_gps_time) > 0:
self.t0 = valid_gps_time[0]
else:
self.t0 = Timestamp()
if np.all(self.test_pose.solution_type == SolutionType.Invalid):
raise ValueError(f'Test data had no valid position solutions.')

if time_axis in ('relative', 'rel'):
self.time_axis = 'relative'
self.t0 = valid_gps_time[0]
self.gps_time_label = 'Relative Time (sec)'
elif time_axis in ('absolute', 'abs'):
self.time_axis = 'absolute'
Expand All @@ -200,7 +201,7 @@ def __init__(self,

self.pose_index_maps = self._get_log_pose_mapping()

if len(self.pose_index_maps) == 0:
if len(self.pose_index_maps[0]) == 0:
raise ValueError('Test and reference logs did not have overlapping GPS times.')

def _get_log_pose_mapping(self):
Expand Down
153 changes: 153 additions & 0 deletions python/fusion_engine_client/applications/p1_drive_analysis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
#!/usr/bin/env python3

import io
import sys
import os
import json
from pathlib import Path
from typing import Dict

import boto3

if __package__ is None or __package__ == "":
from import_utils import enable_relative_imports
__package__ = enable_relative_imports(__name__, __file__)

from ..analysis.pose_compare import main as pose_compare_main
from ..utils import trace as logging
from ..utils.argument_parser import ArgumentParser

S3_DEFAULT_INGEST_BUCKET = 'pointone-ingest-landingpad'
S3_DEFAULT_REGION = 'us-west-1'
META_FILE = "drive_test_metadata.json"
LOG_DIR = Path('/logs/drive_analysis')
MANIFEST_FILE = 'maniphest.json'
TEST_LOG_FILE = 'output/diagnostics.p1log'
REFERENCE_LOG_FILE = 'fusion_engine.p1log'


_logger = logging.getLogger('point_one.fusion_engine.applications.drive_analysis')

s3_client = boto3.client('s3', region_name=S3_DEFAULT_REGION)


def download_to_memory(s3_key) -> bytes:
file_stream = io.BytesIO()
s3_client.download_fileobj(S3_DEFAULT_INGEST_BUCKET, s3_key, file_stream)
file_stream.seek(0)
return file_stream.read()


def find_logs(prefix, log_guids) -> Dict[str, str]:
resp = {}
paginator = s3_client.get_paginator('list_objects_v2')
pages = paginator.paginate(Bucket=S3_DEFAULT_INGEST_BUCKET, Prefix=prefix)
for page in pages:
for content in page['Contents']:
for uuid in log_guids:
if uuid in content['Key']:
offset = content['Key'].index(uuid)
resp[uuid] = content['Key'][:offset] + uuid
break
return resp


def main():
parser = ArgumentParser(description="""\
Run p1_pose_compare for each device included in a drive test.
This tool downloads the relevant files from S3 and prompts stdin before moving on to the next log.
""")
parser.add_argument('-v', '--verbose', action='count', default=0,
help="Print verbose/trace debugging messages.")

parser.add_argument(
'key_for_log_in_drive',
help="The full S3 key for one of the logs in the drive.\n"
"Ex. '2024-04-04/p1-lexus-rack-2/a0a0ff472ea342809d05380d8fe54399'")

options = parser.parse_args()

# Configure logging.
if options.verbose >= 1:
logging.basicConfig(level=logging.INFO, format='%(levelname)s - %(name)s:%(lineno)d - %(message)s',
stream=sys.stdout)
if options.verbose == 1:
logging.getLogger('point_one.fusion_engine.analysis.pose_compare').setLevel(logging.DEBUG)
else:
logging.getLogger('point_one.fusion_engine.analysis.pose_compare').setLevel(
logging.getTraceLevel(depth=options.verbose - 1))
else:
logging.basicConfig(level=logging.INFO, format='%(message)s', stream=sys.stdout)

key_split = options.key_for_log_in_drive.split('/')

if len(key_split) < 3:
_logger.error(
f'Key had unexpected prefix. Expecting S3 key like "2024-04-04/p1-lexus-rack-2/a0a0ff472ea342809d05380d8fe54399".')
exit(1)
elif len(key_split) > 3:
options.key_for_log_in_drive = '/'.join(key_split[:3])
_logger.warning(
f'Key had unexpected prefix. Expecting S3 key like "2024-04-04/p1-lexus-rack-2/a0a0ff472ea342809d05380d8fe54399". Only using "{options.key_for_log_in_drive}".')

prefix = options.key_for_log_in_drive.split('/')[0]

os.makedirs(LOG_DIR, exist_ok=True)

try:
meta_key = options.key_for_log_in_drive + '/' + META_FILE
drive_meta_data = download_to_memory(options.key_for_log_in_drive + '/' + META_FILE)
except:
_logger.error(f'Could not find "S3://{S3_DEFAULT_INGEST_BUCKET}/{meta_key}". Make sure this log was taken as part of a drive test collection.')
exit(1)

drive_meta = json.loads(drive_meta_data.decode('utf-8'))

reference_guid = drive_meta['drive_reference_log']

_logger.info(f'Using reference log: {reference_guid}')

test_guids = drive_meta['drive_logs']

logs_to_download = []
log_paths = {}
for guid in [reference_guid] + test_guids:
log_paths[guid] = LOG_DIR / f'{guid}.p1log'
if not log_paths[guid].exists():
logs_to_download.append(guid)
else:
_logger.info(f'Using cached: {log_paths[guid]}')

if len(logs_to_download) > 0:
log_prefixes = find_logs(prefix, logs_to_download)
for guid in logs_to_download:
if guid not in log_prefixes:
if reference_guid == guid:
_logger.error(
f"Could't find test log: {guid}. Make sure collection wasn't on day boundary. Continuing without it.")
test_guids.remove(guid)
else:
_logger.error(f"Could't find reference log: {guid}. Make sure collection wasn't on day boundary.")
exit(1)

for guid, s3_prefix in log_prefixes.items():
_logger.info(f'Downloading: {log_paths[guid]}')
if reference_guid == guid:
reference_p1log_key = s3_prefix + '/' + REFERENCE_LOG_FILE
else:
reference_p1log_key = s3_prefix + '/' + TEST_LOG_FILE

s3_client.download_file(S3_DEFAULT_INGEST_BUCKET, reference_p1log_key, log_paths[guid])

for guid in test_guids:
_logger.info(f'Comparing log: {guid}')
sys.argv = ['pose_compare_main', str(log_paths[guid]), str(log_paths[reference_guid]), '--time-axis=rel']
try:
pose_compare_main()
except Exception as e:
_logger.error(f'Failure: {e}')
input("Press Enter To Process Next Log")


if __name__ == '__main__':
main()
6 changes: 5 additions & 1 deletion python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,11 @@ def find_version(*file_paths):
'packaging>=21.0.0',
])

all_requirements = message_requirements | tools_requirements | display_requirements | dev_requirements
internal_only_requirements = set([
'boto3>=1.34.79',
])

all_requirements = message_requirements | tools_requirements | display_requirements | dev_requirements | internal_only_requirements

setup(
name='fusion-engine-client',
Expand Down

0 comments on commit 40ace77

Please sign in to comment.