From 9885633ae20f4ec9a5a9a4067e2111eee3419538 Mon Sep 17 00:00:00 2001 From: Lucas Saavedra Vaz <32426024+lucasssvaz@users.noreply.github.com> Date: Wed, 22 May 2024 13:25:37 -0300 Subject: [PATCH] Fix formatting and add checksum comparison --- tools/get.py | 222 +++++++++++++++++++++++++++++++-------------------- 1 file changed, 135 insertions(+), 87 deletions(-) diff --git a/tools/get.py b/tools/get.py index b06b37edb19..30a87f43321 100755 --- a/tools/get.py +++ b/tools/get.py @@ -23,8 +23,7 @@ import zipfile import re import time -from datetime import timedelta -import functools +import argparse # Initialize start_time globally start_time = -1 @@ -67,10 +66,12 @@ def mkdir_p(path): if exc.errno != errno.EEXIST or not os.path.isdir(path): raise + def format_time(seconds): minutes, seconds = divmod(seconds, 60) return "{:02}:{:05.2f}".format(int(minutes), seconds) + def report_progress(block_count, block_size, total_size, start_time): downloaded_size = block_count * block_size time_elapsed = time.time() - start_time @@ -79,121 +80,129 @@ def report_progress(block_count, block_size, total_size, start_time): if sys.stdout.isatty(): if total_size > 0: percent_complete = min((downloaded_size / total_size) * 100, 100) - sys.stdout.write(f"\rDownloading... {percent_complete:.2f}% - {downloaded_size / 1024 / 1024:.2f} MB downloaded - Elapsed Time: {format_time(time_elapsed)} - Speed: {current_speed / 1024 / 1024:.2f} MB/s") + sys.stdout.write( + f"\rDownloading... {percent_complete:.2f}% - {downloaded_size / 1024 / 1024:.2f} MB downloaded - Elapsed Time: {format_time(time_elapsed)} - Speed: {current_speed / 1024 / 1024:.2f} MB/s" # noqa: E501 + ) else: - sys.stdout.write(f"\rDownloading... {downloaded_size / 1024 / 1024:.2f} MB downloaded - Elapsed Time: {format_time(time_elapsed)} - Speed: {current_speed / 1024 / 1024:.2f} MB/s") + sys.stdout.write( + f"\rDownloading... {downloaded_size / 1024 / 1024:.2f} MB downloaded - Elapsed Time: {format_time(time_elapsed)} - Speed: {current_speed / 1024 / 1024:.2f} MB/s" # noqa: E501 + ) sys.stdout.flush() + def print_verification_progress(total_files, i, t1): if sys.stdout.isatty(): - sys.stdout.write(f'\rElapsed time {format_time(time.time() - t1)}') + sys.stdout.write(f"\rElapsed time {format_time(time.time() - t1)}") sys.stdout.flush() + def verify_files(filename, destination, rename_to): # Set the path of the extracted directory extracted_dir_path = destination t1 = time.time() if filename.endswith(".zip"): try: - with zipfile.ZipFile(filename, 'r') as archive: - first_dir = archive.namelist()[0].split('/')[0] + with zipfile.ZipFile(filename, "r") as archive: + first_dir = archive.namelist()[0].split("/")[0] total_files = len(archive.namelist()) for i, zipped_file in enumerate(archive.namelist(), 1): local_path = os.path.join(extracted_dir_path, zipped_file.replace(first_dir, rename_to, 1)) if not os.path.exists(local_path): - #print(f'\nMissing {zipped_file} on location: {extracted_dir_path}') + # print(f'\nMissing {zipped_file} on location: {extracted_dir_path}') print(f"Verification failed; aborted in {format_time(time.time() - t1)}") return False - print_verification_progress(total_files, i , t1) + print_verification_progress(total_files, i, t1) except zipfile.BadZipFile: print(f"Verification failed; aborted in {format_time(time.time() - t1)}") return False elif filename.endswith(".tar.gz"): try: - with tarfile.open(filename, 'r:gz') as archive: - first_dir = archive.getnames()[0].split('/')[0] + with tarfile.open(filename, "r:gz") as archive: + first_dir = archive.getnames()[0].split("/")[0] total_files = len(archive.getnames()) for i, zipped_file in enumerate(archive.getnames(), 1): local_path = os.path.join(extracted_dir_path, zipped_file.replace(first_dir, rename_to, 1)) if not os.path.exists(local_path): - #print(f'\nMissing {zipped_file} on location: {extracted_dir_path}') + # print(f'\nMissing {zipped_file} on location: {extracted_dir_path}') print(f"Verification failed; aborted in {format_time(time.time() - t1)}") return False - print_verification_progress(total_files, i , t1) + print_verification_progress(total_files, i, t1) except tarfile.ReadError: print(f"Verification failed; aborted in {format_time(time.time() - t1)}") return False elif filename.endswith(".tar.xz"): try: - with tarfile.open(filename, 'r:xz') as archive: - first_dir = archive.getnames()[0].split('/')[0] + with tarfile.open(filename, "r:xz") as archive: + first_dir = archive.getnames()[0].split("/")[0] total_files = len(archive.getnames()) for i, zipped_file in enumerate(archive.getnames(), 1): local_path = os.path.join(extracted_dir_path, zipped_file.replace(first_dir, rename_to, 1)) if not os.path.exists(local_path): - #print(f'\nMissing {zipped_file} on location: {extracted_dir_path}') + print(f'\nMissing {zipped_file} on location: {extracted_dir_path}') print(f"Verification failed; aborted in {format_time(time.time() - t1)}") return False - print_verification_progress(total_files, i , t1) + print_verification_progress(total_files, i, t1) except tarfile.ReadError: print(f"Verification failed; aborted in {format_time(time.time() - t1)}") return False else: - raise NotImplementedError('Unsupported archive type') + raise NotImplementedError("Unsupported archive type") - #if(verbose): - #print(f"\nVerification passed; completed in {format_time(time.time() - t1)}") + # if(verbose): + # print(f"\nVerification passed; completed in {format_time(time.time() - t1)}") return True def unpack(filename, destination, force_extract): - dirname = '' - file_is_corrupted=False - if(not force_extract): - print(f' > Verify archive... ', end="", flush=True) + dirname = "" + cfile = None # Compressed file + file_is_corrupted = False + if not force_extract: + print(" > Verify archive... ", end="", flush=True) try: - if filename.endswith('tar.gz'): + if filename.endswith("tar.gz"): if tarfile.is_tarfile(filename): - tfile = tarfile.open(filename, 'r:gz') - dirname = tfile.getnames()[0] + cfile = tarfile.open(filename, "r:gz") + dirname = cfile.getnames()[0] else: - print('File corrupted!') - file_is_corrupted=True - elif filename.endswith('tar.xz'): + print("File corrupted!") + file_is_corrupted = True + elif filename.endswith("tar.xz"): if tarfile.is_tarfile(filename): - tfile = tarfile.open(filename, 'r:xz') - dirname = tfile.getnames()[0] + cfile = tarfile.open(filename, "r:xz") + dirname = cfile.getnames()[0] else: - print('File corrupted!') - file_is_corrupted=True - elif filename.endswith('zip'): + print("File corrupted!") + file_is_corrupted = True + elif filename.endswith("zip"): if zipfile.is_zipfile(filename): - zfile = zipfile.ZipFile(filename) - dirname = zfile.namelist()[0] + cfile = zipfile.ZipFile(filename) + dirname = cfile.namelist()[0] else: - print('File corrupted!') - file_is_corrupted=True + print("File corrupted!") + file_is_corrupted = True else: - raise NotImplementedError('Unsupported archive type') + raise NotImplementedError("Unsupported archive type") except EOFError: - print(f'File corrupted or incomplete!') - file_is_corrupted=True + print("File corrupted or incomplete!") + cfile = None + file_is_corrupted = True - if(file_is_corrupted): + if file_is_corrupted: corrupted_filename = filename + ".corrupted" os.rename(filename, corrupted_filename) - if(verbose): - print(f'Renaming corrupted archive to {corrupted_filename}') + if verbose: + print(f"Renaming corrupted archive to {corrupted_filename}") return False # A little trick to rename tool directories so they don't contain version number - rename_to = re.match(r'^([a-z][^\-]*\-*)+', dirname).group(0).strip('-') - if rename_to == dirname and dirname.startswith('esp32-arduino-libs-'): - rename_to = 'esp32-arduino-libs' + rename_to = re.match(r"^([a-z][^\-]*\-*)+", dirname).group(0).strip("-") + if rename_to == dirname and dirname.startswith("esp32-arduino-libs-"): + rename_to = "esp32-arduino-libs" if not force_extract: - if(verify_files(filename, destination, rename_to)): + if verify_files(filename, destination, rename_to): print(" Files ok. Skipping Extraction") return True else: @@ -201,15 +210,18 @@ def unpack(filename, destination, force_extract): else: print(" Forcing extraction") - if filename.endswith('tar.gz'): - tfile = tarfile.open(filename, 'r:gz') - tfile.extractall(destination) - elif filename.endswith('tar.xz'): - tfile = tarfile.open(filename, 'r:xz') - tfile.extractall(destination) - elif filename.endswith('zip'): - zfile = zipfile.ZipFile(filename) - zfile.extractall(destination) + if filename.endswith("tar.gz"): + if not cfile: + cfile = tarfile.open(filename, "r:gz") + cfile.extractall(destination) + elif filename.endswith("tar.xz"): + if not cfile: + cfile = tarfile.open(filename, "r:xz") + cfile.extractall(destination) + elif filename.endswith("zip"): + if not cfile: + cfile = zipfile.ZipFile(filename) + cfile.extractall(destination) else: raise NotImplementedError("Unsupported archive type") @@ -221,7 +233,8 @@ def unpack(filename, destination, force_extract): return True -def download_file_with_progress(url,filename, start_time): + +def download_file_with_progress(url, filename, start_time): import ssl import contextlib @@ -246,7 +259,7 @@ def download_file_with_progress(url,filename, start_time): block_count += 1 report_progress(block_count, block_size, total_size, start_time) else: - raise Exception('Non-existing file or connection error') + raise Exception("Non-existing file or connection error") def download_file(url, filename): @@ -268,19 +281,21 @@ def download_file(url, filename): break out_file.write(block) else: - raise Exception ('Non-existing file or connection error') + raise Exception("Non-existing file or connection error") + def get_tool(tool, force_download, force_extract): sys_name = platform.system() archive_name = tool["archiveFileName"] + checksum = tool["checksum"][8:] local_path = dist_dir + archive_name - url = tool['url'] + url = tool["url"] start_time = time.time() if not os.path.isfile(local_path) or force_download: if verbose: - print('Downloading \'' + archive_name + '\' to \'' + local_path + '\'') + print("Downloading '" + archive_name + "' to '" + local_path + "'") else: - print('Downloading \'' + archive_name + '\' ...') + print("Downloading '" + archive_name + "' ...") sys.stdout.flush() if "CYGWIN_NT" in sys_name: import ssl @@ -301,14 +316,19 @@ def get_tool(tool, force_download, force_extract): else: try: urlretrieve(url, local_path, report_progress) - except: + except: # noqa: E722 download_file_with_progress(url, local_path, start_time) sys.stdout.write(" - Done\n") sys.stdout.flush() else: print("Tool {0} already downloaded".format(archive_name)) sys.stdout.flush() - return unpack(local_path, '.', force_extract) + + if sha256sum(local_path) != checksum: + print("Checksum mismatch for {0}".format(archive_name)) + return False + + return unpack(local_path, ".", force_extract) def load_tools_list(filename, platform): @@ -355,29 +375,55 @@ def identify_platform(): print("System: %s, Bits: %d, Info: %s" % (sys_name, bits, sys_platform)) return arduino_platform_names[sys_name][bits] -def print_help(): - print("TODO help") -if __name__ == '__main__': - option_print_help = "-h" in sys.argv - verbose = "-v" in sys.argv - force_download = "-d" in sys.argv - force_extract = "-e" in sys.argv - force_all = "-f" in sys.argv - is_test = "-t" in sys.argv +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Download and extract tools") + + parser.add_argument("-v", "--verbose", + type=bool, + default=False, + required=False, + help="Print verbose output") + + parser.add_argument("-d", "--force_download", + type=bool, + default=False, + required=False, + help="Force download of tools") + + parser.add_argument("-e", "--force_extract", + type=bool, + default=False, + required=False, + help="Force extraction of tools") + + parser.add_argument("-f", "--force_all", + type=bool, + default=False, + required=False, + help="Force download and extraction of tools") + + parser.add_argument("-t", "--test", + type=bool, + default=False, + required=False, + help=argparse.SUPPRESS) + + args = parser.parse_args() - print(f'Debug: options values: option_print_help={option_print_help}; verbose={verbose}; force_download={force_download}; force_extract={force_extract}; force_all={force_all}; is_test={is_test};') - if option_print_help: - print_help() - sys.exit() + verbose = args.verbose + force_download = args.force_download + force_extract = args.force_extract + force_all = args.force_all + is_test = args.test if is_test and (force_download or force_extract or force_all): - print('Cannot combine test (-t) and forced execution (-d | -e | -f)') - print_help() - sys.exit() + print("Cannot combine test (-t) and forced execution (-d | -e | -f)") + parser.print_help(sys.stderr) + sys.exit(1) if is_test: - print('Test run!') + print("Test run!") if force_all: force_download = True @@ -393,9 +439,11 @@ def print_help(): if is_test: print("Would install: {0}".format(tool["archiveFileName"])) else: - if(not get_tool(tool, force_download, force_extract)): - if(verbose): + if not get_tool(tool, force_download, force_extract): + if verbose: print(f"Tool {tool['archiveFileName']} was corrupted. Re-downloading...\n") - if(not get_tool(tool, True, force_extract)): # Corrupted file was renamed, will not be found and will be re-downloaded + if not get_tool( + tool, True, force_extract + ): # Corrupted file was renamed, will not be found and will be re-downloaded print(f"Tool {tool['archiveFileName']} was corrupted, but re-downloading did not help!\n") - print('Platform Tools Installed') + print("Platform Tools Installed")