From 36ab74590340294b1b2617939a620a2ab3d52d3f Mon Sep 17 00:00:00 2001 From: strahi-linux Date: Tue, 30 Jul 2024 13:05:57 +0200 Subject: [PATCH] Added mcu list indexing for each mcu package --- scripts/index.py | 13 +++++++- scripts/support.py | 74 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 86 insertions(+), 1 deletion(-) diff --git a/scripts/index.py b/scripts/index.py index 4655a7ad7..9ede6d9c9 100644 --- a/scripts/index.py +++ b/scripts/index.py @@ -1,5 +1,6 @@ import os, re, time, argparse, requests from elasticsearch import Elasticsearch +from pathlib import Path import support as support @@ -111,6 +112,12 @@ def index_release_to_elasticsearch(es : Elasticsearch, index_name, release_detai metadata_download_url = metadata_asset['url'] metadata_content.append(fetch_json_data(metadata_download_url, token)[0]) + # Fetch documentation regarding latest release + docs_path = os.path.join(str(Path(__file__).parent.parent.absolute()), 'output/docs') + docs_asset = next((a for a in release_details[0]['assets'] if a['name'] == "docs.7z"), None) + support.extract_archive_from_url(docs_asset['url'], docs_path, token) + mcu_check_list = support.fetch_package_mcus(docs_path) + for asset in release_details[0].get('assets', []): # Do not index metadata or docs if asset['name'] == 'metadata.json' or asset['name'] == 'docs.7z': @@ -186,7 +193,11 @@ def index_release_to_elasticsearch(es : Elasticsearch, index_name, release_detai 'preinit', 'unit_test_lib', 'mikroe_utils_common' - ] + ], + 'mcus': support.fetch_mcu_list( + name_without_extension, + mcu_check_list + ) } ) diff --git a/scripts/support.py b/scripts/support.py index 53fefe789..a1b5cb17b 100644 --- a/scripts/support.py +++ b/scripts/support.py @@ -1,3 +1,5 @@ +import os, io, json, requests + def get_previous_release(releases, prerelases=None): ''' Fetch the previously released version ''' for counter, release in enumerate(releases): @@ -14,3 +16,75 @@ def get_previous_release(releases, prerelases=None): def get_latest_release(releases): ''' Fetch the latest released version ''' return next((release for release in releases if not release['prerelease'] and not release['draft']), None) + +def determine_archive_type(byte_stream): + ''' + Implement logic to determine the archive type, e.g., by file extension or magic number + For simplicity, let's assume byte_stream has a 'name' attribute (e.g., a file-like object) + ''' + byte_stream.seek(0) + signature = byte_stream.read(4) + byte_stream.seek(0) + if signature == b'PK\x03\x04': # ZIP magic number, it is what it is + return 'zip' + else: + return '7z' + +def extract_archive_from_url(url, destination, token = None): + """ + Extract the contents of an archive (7z or zip) from a URL directly + in memory, without downloading the file. + """ + print(f"Download link: {url}") + headers = { + 'Authorization': f'token {token}', + 'Accept': 'application/octet-stream' + } + if 'github' in url: + response = requests.get(url, headers=headers, stream=True) + else: + response = requests.get(url, stream=True) + + response.raise_for_status() + + if response.status_code == 200: ## Response OK? + with io.BytesIO() as byte_stream: + + for chunk in response.iter_content(chunk_size=8192): + byte_stream.write(chunk) + + byte_stream.seek(0) + + archive_type = determine_archive_type(byte_stream) + + if archive_type == '7z': + import py7zr + with py7zr.SevenZipFile(byte_stream, mode='r') as archive: + archive.extractall(path=destination) + elif archive_type == 'zip': + import zipfile + with zipfile.ZipFile(byte_stream, mode='r') as archive: + for info in archive.infolist(): + archive.extract(info, path=destination) + else: + raise ValueError("Unsupported archive type") + else: + raise Exception(f"Failed to download file: status code {response.status_code}") + +def fetch_package_mcus(check_path): + json_files = [] + for root, dirs, files in os.walk(check_path): + for file in files: + if file.endswith('full.json'): + json_files.append(os.path.join(root, file)) + return json_files + +def fetch_mcu_list(package_name, file_list): + for each_file in file_list: + with open(each_file, 'r') as json_file: + json_file_content = json.load(json_file) + json_file.close() + for item in json_file_content: + if package_name in item: + return item[package_name]["mcus"] + return None