diff --git a/cli.py b/cli.py index 9057e1e2..eaa0f276 100644 --- a/cli.py +++ b/cli.py @@ -161,7 +161,7 @@ def main( ), ProcessingStep( "download", - "download_artifacts", + "download_all_artifacts", precondition="meta_sources_parsed", precondition_error_msg="Error: You want to download all artifacts, but the data from the cert. framework website was not parsed. You must use 'build' action first.", pre_callback_func=None, diff --git a/sec_certs/dataset/common_criteria.py b/sec_certs/dataset/common_criteria.py index 696e08c1..9e022a5b 100644 --- a/sec_certs/dataset/common_criteria.py +++ b/sec_certs/dataset/common_criteria.py @@ -64,13 +64,6 @@ def to_pandas(self) -> pd.DataFrame: return df - @property - def certs_dir(self) -> Path: - """ - Returns directory that holds files associated with certificates - """ - return self.root_dir / "certs" - @property def reports_dir(self) -> Path: """ @@ -137,6 +130,10 @@ def mu_dataset(self) -> "CCDatasetMaintenanceUpdates": return CCDatasetMaintenanceUpdates.from_json(self.mu_dataset_path / "Maintenance updates.json") + @property + def artifact_download_methods(self) -> List[Callable]: + return [self._download_reports, self._download_targets] + BASE_URL: ClassVar[str] = "https://www.commoncriteriaportal.org" HTML_PRODUCTS_URL = { @@ -517,6 +514,11 @@ def _parse_table( return certs def _download_reports(self, fresh: bool = True) -> None: + if fresh: + logger.info("Downloading PDFs of CC certification reports.") + else: + logger.info("Attempting to re-download failed PDFs of CC certification reports.") + self.reports_pdf_dir.mkdir(parents=True, exist_ok=True) certs_to_process = [x for x in self if x.state.report_is_ok_to_download(fresh) and x.report_link] cert_processing.process_parallel( @@ -527,6 +529,11 @@ def _download_reports(self, fresh: bool = True) -> None: ) def _download_targets(self, fresh: bool = True) -> None: + if fresh: + logger.info("Downloading PDFs of CC security targets.") + else: + logger.info("Attempting to re-download failed PDFs of CC security targets.") + self.targets_pdf_dir.mkdir(parents=True, exist_ok=True) certs_to_process = [x for x in self if x.state.report_is_ok_to_download(fresh)] cert_processing.process_parallel( @@ -536,32 +543,6 @@ def _download_targets(self, fresh: bool = True) -> None: progress_bar_desc="Downloading targets", ) - @serialize - def download_all_artifacts(self, fresh: bool = True) -> None: - """ - Downloads all pdf files associated with certificates of the datset. - - :param bool fresh: whether all (true) or only failed (false) pdfs shall be downloaded, defaults to True - """ - if self.state.meta_sources_parsed is False: - logger.error("Attempting to download pdfs while not having csv/html meta-sources parsed. Returning.") - return - - logger.info("Downloading CC sample reports") - self._download_reports(fresh) - - logger.info("Downloading CC security targets") - self._download_targets(fresh) - - if fresh is True: - logger.info("Attempting to re-download failed report links.") - self._download_reports(False) - - logger.info("Attempting to re-download failed security target links.") - self._download_targets(False) - - self.state.pdfs_downloaded = True - def _convert_reports_to_txt(self, fresh: bool = True) -> None: self.reports_txt_dir.mkdir(parents=True, exist_ok=True) certs_to_process = [x for x in self if x.state.report_is_ok_to_convert(fresh)] @@ -589,7 +570,7 @@ def convert_all_pdfs(self, fresh: bool = True) -> None: :param bool fresh: whether all (true) or only failed (false) pdfs shall be converted, defaults to True """ - if self.state.pdfs_downloaded is False: + if self.state.artifacts_downloaded is False: logger.info("Attempting to convert pdf while not having them downloaded. Returning.") return diff --git a/sec_certs/dataset/dataset.py b/sec_certs/dataset/dataset.py index d5c3afaa..4b1c8c7b 100644 --- a/sec_certs/dataset/dataset.py +++ b/sec_certs/dataset/dataset.py @@ -9,7 +9,23 @@ from dataclasses import dataclass from datetime import datetime from pathlib import Path -from typing import Any, Collection, Dict, Generic, Iterator, Optional, Pattern, Set, Tuple, Type, TypeVar, Union, cast +from typing import ( + Any, + Callable, + Collection, + Dict, + Generic, + Iterator, + List, + Optional, + Pattern, + Set, + Tuple, + Type, + TypeVar, + Union, + cast, +) import pandas as pd import requests @@ -36,7 +52,7 @@ class Dataset(Generic[CertSubType], ComplexSerializableType, ABC): @dataclass class DatasetInternalState(ComplexSerializableType): meta_sources_parsed: bool = False - pdfs_downloaded: bool = False + artifacts_downloaded: bool = False pdfs_converted: bool = False certs_analyzed: bool = False @@ -100,6 +116,13 @@ def web_dir(self) -> Path: def auxillary_datasets_dir(self) -> Path: return self.root_dir / "auxillary_datasets" + @property + def certs_dir(self) -> Path: + """ + Returns directory that holds files associated with certificates + """ + return self.root_dir / "certs" + @property def cpe_dataset_path(self) -> Path: return self.auxillary_datasets_dir / "cpe_dataset.json" @@ -116,6 +139,11 @@ def nist_cve_cpe_matching_dset_path(self) -> Path: def json_path(self) -> Path: return self.root_dir / (self.name + ".json") + @property + @abstractmethod + def artifact_download_methods(self) -> List[Callable]: + raise NotImplementedError("Not meant to be implemented by the base class.") + def __contains__(self, item: object) -> bool: if not isinstance(item, Certificate): raise TypeError( @@ -202,9 +230,20 @@ def get_certs_from_web(self) -> None: def process_auxillary_datasets(self) -> None: raise NotImplementedError("Not meant to be implemented by the base class.") - @abstractmethod - def download_all_artifacts(self, cert_ids: Optional[Set[str]] = None) -> None: - raise NotImplementedError("Not meant to be implemented by the base class.") + @serialize + def download_all_artifacts(self, fresh: bool = True) -> None: + if self.state.meta_sources_parsed is False: + logger.error("Attempting to download pdfs while not having csv/html meta-sources parsed. Returning.") + return + + for method in self.artifact_download_methods: + method(fresh) + + if fresh: + for method in self.artifact_download_methods: + method(False) + + self.state.artifacts_downloaded = True @abstractmethod def convert_all_pdfs(self) -> None: diff --git a/sec_certs/dataset/fips.py b/sec_certs/dataset/fips.py index 24550de6..c0d4a68a 100644 --- a/sec_certs/dataset/fips.py +++ b/sec_certs/dataset/fips.py @@ -2,7 +2,7 @@ import logging import shutil from pathlib import Path -from typing import Dict, Final, List, Optional, Set +from typing import Callable, Dict, Final, List, Set import numpy as np import pandas as pd @@ -35,12 +35,28 @@ class FIPSDataset(Dataset[FIPSCertificate], ComplexSerializableType): @property def policies_dir(self) -> Path: - return self.root_dir / "security_policies" + return self.certs_dir / "policies" + + @property + def policies_pdf_dir(self) -> Path: + return self.policies_dir / "pdf" + + @property + def policies_txt_dir(self) -> Path: + return self.policies_dir / "txt" + + @property + def module_dir(self) -> Path: + return self.certs_dir / "modules" @property def algorithms_dir(self) -> Path: return self.auxillary_datasets_dir / "algorithms" + @property + def artifact_download_methods(self) -> List[Callable]: + return [self._download_modules, self._download_policies] + @serialize def _extract_data(self, redo: bool = False) -> None: """ @@ -59,64 +75,37 @@ def _extract_data(self, redo: bool = False) -> None: for keyword, cert in keywords: self.certs[cert.dgst].pdf_data.keywords = keyword - def download_all_artifacts(self, cert_ids: Optional[Set[str]] = None) -> None: - """ - Downloads all pdf files related to the certificates specified with cert_ids. + def _download_modules(self, fresh: bool = True) -> None: + self.module_dir.mkdir(exist_ok=True) - :param Optional[Set[str]] cert_ids: cert_ids to download the pdfs foor, defaults to None - :raises RuntimeError: If no cert_ids are specified, raises. - """ - # TODO: The code below was migrated here from get_certs_web() - # self.policies_dir.mkdir(exist_ok=True) - # self.algorithms_dir.mkdir(exist_ok=True) - # logger.info("Downloading certificate html and security policies") - # self._download_all_htmls(cert_ids) - # self.download_all_pdfs(cert_ids) - # self.web_scan(cert_ids, redo=redo_web_scan, update_json=False) - - sp_paths, sp_urls = [], [] - self.policies_dir.mkdir(exist_ok=True) - if cert_ids is None: - raise RuntimeError("You need to provide cert ids to FIPS download PDFs functionality.") - for cert_id in cert_ids: - if not (self.policies_dir / f"{cert_id}.pdf").exists() or ( - fips_dgst(cert_id) in self.certs and not self.certs[fips_dgst(cert_id)].state.txt_state - ): - sp_urls.append(constants.FIPS_SP_URL.format(cert_id)) - sp_paths.append(self.policies_dir / f"{cert_id}.pdf") - logger.info(f"downloading {len(sp_urls)} module pdf files") + if fresh: + logger.info("Downloading HTML cryptographic modules.") + else: + logger.info("Attempting re-download of failed HTML cryptographic modules.") + + certs_to_process = [x for x in self if x.state.module_is_ok_to_download(fresh)] cert_processing.process_parallel( - FIPSCertificate.download_security_policy, - list(zip(sp_urls, sp_paths)), + FIPSCertificate.download_module, + certs_to_process, config.n_threads, - progress_bar_desc="Downloading PDF files", + progress_bar_desc="Downloading HTML modules", ) - def _download_all_htmls(self, cert_ids: Set[str]) -> None: - html_paths, html_urls = [], [] - self.web_dir.mkdir(exist_ok=True) - for cert_id in cert_ids: - if not (self.web_dir / f"{cert_id}.html").exists(): - html_urls.append(constants.FIPS_MODULE_URL.format(cert_id)) - html_paths.append(self.web_dir / f"{cert_id}.html") - - logger.info(f"downloading {len(html_urls)} module html files") - failed = cert_processing.process_parallel( - FIPSCertificate.download_html_page, - list(zip(html_urls, html_paths)), + def _download_policies(self, fresh: bool = True) -> None: + self.policies_pdf_dir.mkdir(exist_ok=True) + + if fresh: + logger.info("Downloading PDF security policies.") + else: + logger.info("Attempting re-download of failed PDF security policies.") + + certs_to_process = [x for x in self if x.state.policy_is_ok_to_download(fresh)] + cert_processing.process_parallel( + FIPSCertificate.download_policy, + certs_to_process, config.n_threads, - progress_bar_desc="Downloading HTML files", + progress_bar_desc="Downloading PDF security policies", ) - failed = [c for c in failed if c] - - if len(failed) != 0: - logger.info(f"Download failed for {len(failed)} files. Retrying...") - cert_processing.process_parallel( - FIPSCertificate.download_html_page, - failed, - config.n_threads, - progress_bar_desc="Downloading HTML files again", - ) @serialize def convert_all_pdfs(self) -> None: @@ -196,7 +185,7 @@ def from_web_latest(cls) -> "FIPSDataset": def _set_local_paths(self) -> None: cert: FIPSCertificate for cert in self.certs.values(): - cert.set_local_paths(self.policies_dir, self.web_dir) + cert.set_local_paths(self.policies_pdf_dir, self.policies_txt_dir, self.web_dir) @serialize def get_certs_from_web(self, to_download: bool = True, keep_metadata: bool = True) -> None: @@ -221,9 +210,8 @@ def process_auxillary_datasets(self) -> None: def _process_algorithms(self): logger.info("Processing FIPS algorithms.") - self.algorithms = FIPSAlgorithmDataset( - {}, Path(self.root_dir / "web" / "algorithms"), "algorithms", "sample algs" - ) + self.algorithms_dir.mkdir(parernts=True, exist_ok=True) + self.algorithms = FIPSAlgorithmDataset({}, self.algorithms_dir, "algorithms", "sample algs") self.algorithms.get_certs_from_web() logger.info(f"Finished parsing. Have algorithm dataset with {len(self.algorithms)} algorithm numbers.") diff --git a/sec_certs/sample/fips.py b/sec_certs/sample/fips.py index bfd0e685..9a122646 100644 --- a/sec_certs/sample/fips.py +++ b/sec_certs/sample/fips.py @@ -248,22 +248,46 @@ class InternalState(ComplexSerializableType): file_status: bool txt_state: bool - sp_path: Path - html_path: Path + policy_pdf_path: Path + policy_txt_path: Path + module_html_path: Path + + module_download_ok: bool + policy_download_ok: bool + + errors: List[str] def __init__( self, tables_done: bool = False, file_status: bool = True, # TODO: Check if this is correct txt_state: bool = True, # TODO: Check if this is correct + module_download_ok: bool = False, + policy_download_ok: bool = False, + errors: Optional[List[str]] = None, ): self.tables_done = tables_done self.file_status = file_status self.txt_state = txt_state + self.module_download_ok = module_download_ok + self.policy_download_ok = policy_download_ok + self.errors = errors if errors else [] + + @property + def serialized_attributes(self) -> List[str]: + # TODO: Fix me, add other variables + return ["tables_done", "file_status", "txt_state"] + + def module_is_ok_to_download(self, fresh: bool = True) -> bool: + return True if fresh else not self.module_download_ok + + def policy_is_ok_to_download(self, fresh: bool = True) -> bool: + return True if fresh else not self.policy_download_ok - def set_local_paths(self, sp_dir: Path, web_dir: Path) -> None: - self.state.sp_path = (sp_dir / str(self.dgst)).with_suffix(".pdf") - self.state.html_path = (web_dir / str(self.dgst)).with_suffix(".html") + def set_local_paths(self, policies_pdf_dir: Path, policies_txt_dir: Path, web_dir: Path) -> None: + self.state.policy_pdf_path = (policies_pdf_dir / str(self.dgst)).with_suffix(".pdf") + self.state.policy_txt_path = (policies_txt_dir / str(self.dgst)).with_suffix(".txt") + self.state.module_html_path = (web_dir / str(self.dgst)).with_suffix(".html") @dataclass(eq=True) class WebData(ComplexSerializableType): @@ -368,6 +392,14 @@ def dgst(self) -> str: def manufacturer(self) -> Optional[str]: # type: ignore return self.web_data.vendor + @property + def module_html_url(self) -> str: + return constants.FIPS_MODULE_URL.format(self.dgst) + + @property + def policy_pdf_url(self) -> str: + return constants.FIPS_SP_URL.format(self.dgst) + @property def name(self) -> Optional[str]: # type: ignore return self.web_data.module_name @@ -555,18 +587,29 @@ def download_html_page(cert: Tuple[str, Path]) -> Optional[Tuple[str, Path]]: return None @staticmethod - def download_security_policy(cert: Tuple[str, Path]) -> None: - """ - Downloads security policy file from web. Staticmethod to allow for parametrization. - """ - exit_code = helpers.download_file(*cert, delay=constants.FIPS_DOWNLOAD_DELAY) - if exit_code != requests.codes.ok: - logger.error(f"Failed to download security policy from {cert[0]}, code: {exit_code}") + def download_module(cert: FIPSCertificate) -> FIPSCertificate: + if (exit_code := helpers.download_file(cert.module_html_url, cert.state.module_html_path)) != requests.codes.ok: + error_msg = f"failed to download html module from {cert.module_html_url}, code {exit_code}" + logger.error(f"Cert dgst: {cert.dgst} " + error_msg) + cert.state.module_download_ok = False + else: + cert.state.module_download_ok = True + return cert + + @staticmethod + def download_policy(cert: FIPSCertificate) -> FIPSCertificate: + if (exit_code := helpers.download_file(cert.policy_pdf_url, cert.state.policy_pdf_path)) != requests.codes.ok: + error_msg = f"failed to download pdf policy from {cert.policy_pdf_url}, code {exit_code}" + logger.error(f"Cert dgst: {cert.dgst} " + error_msg) + cert.state.policy_download_ok = False + else: + cert.state.policy_download_ok = True + return cert @staticmethod def extract_sp_metadata(cert: FIPSCertificate) -> FIPSCertificate: """Extract the PDF metadata from the security policy. Staticmethod to allow for parametrization.""" - _, metadata = sec_certs.utils.pdf.extract_pdf_metadata(cert.state.sp_path) + _, metadata = sec_certs.utils.pdf.extract_pdf_metadata(cert.state.policy_pdf_path) cert.pdf_data.st_metadata = metadata if metadata else dict() return cert @@ -626,7 +669,9 @@ def find_keywords(cert: FIPSCertificate) -> Tuple[Optional[Dict], FIPSCertificat if not cert.state.txt_state: return None, cert - keywords = sec_certs.utils.extract.extract_keywords(cert.state.sp_path.with_suffix(".pdf.txt"), fips_rules) + keywords = sec_certs.utils.extract.extract_keywords( + cert.state.policy_pdf_path.with_suffix(".pdf.txt"), fips_rules + ) return keywords, cert @staticmethod @@ -653,7 +698,7 @@ def extract_algorithm_certificates(current_text): ): return cert.state.tables_done, cert, set() - cert_file = cert.state.sp_path + cert_file = cert.state.policy_pdf_path txt_file = cert_file.with_suffix(".pdf.txt") with open(txt_file, "r", encoding="utf-8") as f: tables = sec_certs.utils.tables.find_tables(f.read(), txt_file)