Skip to content

Commit

Permalink
Merge pull request #77 from MikroElektronika/topic/index-mcu-info
Browse files Browse the repository at this point in the history
Added mcu list indexing for each mcu package
  • Loading branch information
StrahinjaJacimovic authored Jul 31, 2024
2 parents d5e1ae4 + 36ab745 commit 2bebb6c
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 1 deletion.
13 changes: 12 additions & 1 deletion scripts/index.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os, re, time, argparse, requests
from elasticsearch import Elasticsearch
from pathlib import Path

import support as support

Expand Down Expand Up @@ -111,6 +112,12 @@ def index_release_to_elasticsearch(es : Elasticsearch, index_name, release_detai
metadata_download_url = metadata_asset['url']
metadata_content.append(fetch_json_data(metadata_download_url, token)[0])

# Fetch documentation regarding latest release
docs_path = os.path.join(str(Path(__file__).parent.parent.absolute()), 'output/docs')
docs_asset = next((a for a in release_details[0]['assets'] if a['name'] == "docs.7z"), None)
support.extract_archive_from_url(docs_asset['url'], docs_path, token)
mcu_check_list = support.fetch_package_mcus(docs_path)

for asset in release_details[0].get('assets', []):
# Do not index metadata or docs
if asset['name'] == 'metadata.json' or asset['name'] == 'docs.7z':
Expand Down Expand Up @@ -186,7 +193,11 @@ def index_release_to_elasticsearch(es : Elasticsearch, index_name, release_detai
'preinit',
'unit_test_lib',
'mikroe_utils_common'
]
],
'mcus': support.fetch_mcu_list(
name_without_extension,
mcu_check_list
)
}
)

Expand Down
74 changes: 74 additions & 0 deletions scripts/support.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import os, io, json, requests

def get_previous_release(releases, prerelases=None):
''' Fetch the previously released version '''
for counter, release in enumerate(releases):
Expand All @@ -14,3 +16,75 @@ def get_previous_release(releases, prerelases=None):
def get_latest_release(releases):
''' Fetch the latest released version '''
return next((release for release in releases if not release['prerelease'] and not release['draft']), None)

def determine_archive_type(byte_stream):
'''
Implement logic to determine the archive type, e.g., by file extension or magic number
For simplicity, let's assume byte_stream has a 'name' attribute (e.g., a file-like object)
'''
byte_stream.seek(0)
signature = byte_stream.read(4)
byte_stream.seek(0)
if signature == b'PK\x03\x04': # ZIP magic number, it is what it is
return 'zip'
else:
return '7z'

def extract_archive_from_url(url, destination, token = None):
"""
Extract the contents of an archive (7z or zip) from a URL directly
in memory, without downloading the file.
"""
print(f"Download link: {url}")
headers = {
'Authorization': f'token {token}',
'Accept': 'application/octet-stream'
}
if 'github' in url:
response = requests.get(url, headers=headers, stream=True)
else:
response = requests.get(url, stream=True)

response.raise_for_status()

if response.status_code == 200: ## Response OK?
with io.BytesIO() as byte_stream:

for chunk in response.iter_content(chunk_size=8192):
byte_stream.write(chunk)

byte_stream.seek(0)

archive_type = determine_archive_type(byte_stream)

if archive_type == '7z':
import py7zr
with py7zr.SevenZipFile(byte_stream, mode='r') as archive:
archive.extractall(path=destination)
elif archive_type == 'zip':
import zipfile
with zipfile.ZipFile(byte_stream, mode='r') as archive:
for info in archive.infolist():
archive.extract(info, path=destination)
else:
raise ValueError("Unsupported archive type")
else:
raise Exception(f"Failed to download file: status code {response.status_code}")

def fetch_package_mcus(check_path):
json_files = []
for root, dirs, files in os.walk(check_path):
for file in files:
if file.endswith('full.json'):
json_files.append(os.path.join(root, file))
return json_files

def fetch_mcu_list(package_name, file_list):
for each_file in file_list:
with open(each_file, 'r') as json_file:
json_file_content = json.load(json_file)
json_file.close()
for item in json_file_content:
if package_name in item:
return item[package_name]["mcus"]
return None

0 comments on commit 2bebb6c

Please sign in to comment.