Skip to content

Commit

Permalink
implement artifact download FIPS
Browse files Browse the repository at this point in the history
  • Loading branch information
adamjanovsky committed Oct 27, 2022
1 parent 39c89c1 commit 9433658
Show file tree
Hide file tree
Showing 5 changed files with 165 additions and 112 deletions.
2 changes: 1 addition & 1 deletion cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ def main(
),
ProcessingStep(
"download",
"download_artifacts",
"download_all_artifacts",
precondition="meta_sources_parsed",
precondition_error_msg="Error: You want to download all artifacts, but the data from the cert. framework website was not parsed. You must use 'build' action first.",
pre_callback_func=None,
Expand Down
49 changes: 15 additions & 34 deletions sec_certs/dataset/common_criteria.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,6 @@ def to_pandas(self) -> pd.DataFrame:

return df

@property
def certs_dir(self) -> Path:
"""
Returns directory that holds files associated with certificates
"""
return self.root_dir / "certs"

@property
def reports_dir(self) -> Path:
"""
Expand Down Expand Up @@ -137,6 +130,10 @@ def mu_dataset(self) -> "CCDatasetMaintenanceUpdates":

return CCDatasetMaintenanceUpdates.from_json(self.mu_dataset_path / "Maintenance updates.json")

@property
def artifact_download_methods(self) -> List[Callable]:
return [self._download_reports, self._download_targets]

BASE_URL: ClassVar[str] = "https://www.commoncriteriaportal.org"

HTML_PRODUCTS_URL = {
Expand Down Expand Up @@ -517,6 +514,11 @@ def _parse_table(
return certs

def _download_reports(self, fresh: bool = True) -> None:
if fresh:
logger.info("Downloading PDFs of CC certification reports.")
else:
logger.info("Attempting to re-download failed PDFs of CC certification reports.")

self.reports_pdf_dir.mkdir(parents=True, exist_ok=True)
certs_to_process = [x for x in self if x.state.report_is_ok_to_download(fresh) and x.report_link]
cert_processing.process_parallel(
Expand All @@ -527,6 +529,11 @@ def _download_reports(self, fresh: bool = True) -> None:
)

def _download_targets(self, fresh: bool = True) -> None:
if fresh:
logger.info("Downloading PDFs of CC security targets.")
else:
logger.info("Attempting to re-download failed PDFs of CC security targets.")

self.targets_pdf_dir.mkdir(parents=True, exist_ok=True)
certs_to_process = [x for x in self if x.state.report_is_ok_to_download(fresh)]
cert_processing.process_parallel(
Expand All @@ -536,32 +543,6 @@ def _download_targets(self, fresh: bool = True) -> None:
progress_bar_desc="Downloading targets",
)

@serialize
def download_all_artifacts(self, fresh: bool = True) -> None:
"""
Downloads all pdf files associated with certificates of the datset.
:param bool fresh: whether all (true) or only failed (false) pdfs shall be downloaded, defaults to True
"""
if self.state.meta_sources_parsed is False:
logger.error("Attempting to download pdfs while not having csv/html meta-sources parsed. Returning.")
return

logger.info("Downloading CC sample reports")
self._download_reports(fresh)

logger.info("Downloading CC security targets")
self._download_targets(fresh)

if fresh is True:
logger.info("Attempting to re-download failed report links.")
self._download_reports(False)

logger.info("Attempting to re-download failed security target links.")
self._download_targets(False)

self.state.pdfs_downloaded = True

def _convert_reports_to_txt(self, fresh: bool = True) -> None:
self.reports_txt_dir.mkdir(parents=True, exist_ok=True)
certs_to_process = [x for x in self if x.state.report_is_ok_to_convert(fresh)]
Expand Down Expand Up @@ -589,7 +570,7 @@ def convert_all_pdfs(self, fresh: bool = True) -> None:
:param bool fresh: whether all (true) or only failed (false) pdfs shall be converted, defaults to True
"""
if self.state.pdfs_downloaded is False:
if self.state.artifacts_downloaded is False:
logger.info("Attempting to convert pdf while not having them downloaded. Returning.")
return

Expand Down
49 changes: 44 additions & 5 deletions sec_certs/dataset/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,23 @@
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import Any, Collection, Dict, Generic, Iterator, Optional, Pattern, Set, Tuple, Type, TypeVar, Union, cast
from typing import (
Any,
Callable,
Collection,
Dict,
Generic,
Iterator,
List,
Optional,
Pattern,
Set,
Tuple,
Type,
TypeVar,
Union,
cast,
)

import pandas as pd
import requests
Expand All @@ -36,7 +52,7 @@ class Dataset(Generic[CertSubType], ComplexSerializableType, ABC):
@dataclass
class DatasetInternalState(ComplexSerializableType):
meta_sources_parsed: bool = False
pdfs_downloaded: bool = False
artifacts_downloaded: bool = False
pdfs_converted: bool = False
certs_analyzed: bool = False

Expand Down Expand Up @@ -100,6 +116,13 @@ def web_dir(self) -> Path:
def auxillary_datasets_dir(self) -> Path:
return self.root_dir / "auxillary_datasets"

@property
def certs_dir(self) -> Path:
"""
Returns directory that holds files associated with certificates
"""
return self.root_dir / "certs"

@property
def cpe_dataset_path(self) -> Path:
return self.auxillary_datasets_dir / "cpe_dataset.json"
Expand All @@ -116,6 +139,11 @@ def nist_cve_cpe_matching_dset_path(self) -> Path:
def json_path(self) -> Path:
return self.root_dir / (self.name + ".json")

@property
@abstractmethod
def artifact_download_methods(self) -> List[Callable]:
raise NotImplementedError("Not meant to be implemented by the base class.")

def __contains__(self, item: object) -> bool:
if not isinstance(item, Certificate):
raise TypeError(
Expand Down Expand Up @@ -202,9 +230,20 @@ def get_certs_from_web(self) -> None:
def process_auxillary_datasets(self) -> None:
raise NotImplementedError("Not meant to be implemented by the base class.")

@abstractmethod
def download_all_artifacts(self, cert_ids: Optional[Set[str]] = None) -> None:
raise NotImplementedError("Not meant to be implemented by the base class.")
@serialize
def download_all_artifacts(self, fresh: bool = True) -> None:
if self.state.meta_sources_parsed is False:
logger.error("Attempting to download pdfs while not having csv/html meta-sources parsed. Returning.")
return

for method in self.artifact_download_methods:
method(fresh)

if fresh:
for method in self.artifact_download_methods:
method(False)

self.state.artifacts_downloaded = True

@abstractmethod
def convert_all_pdfs(self) -> None:
Expand Down
102 changes: 45 additions & 57 deletions sec_certs/dataset/fips.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import logging
import shutil
from pathlib import Path
from typing import Dict, Final, List, Optional, Set
from typing import Callable, Dict, Final, List, Set

import numpy as np
import pandas as pd
Expand Down Expand Up @@ -35,12 +35,28 @@ class FIPSDataset(Dataset[FIPSCertificate], ComplexSerializableType):

@property
def policies_dir(self) -> Path:
return self.root_dir / "security_policies"
return self.certs_dir / "policies"

@property
def policies_pdf_dir(self) -> Path:
return self.policies_dir / "pdf"

@property
def policies_txt_dir(self) -> Path:
return self.policies_dir / "txt"

@property
def module_dir(self) -> Path:
return self.certs_dir / "modules"

@property
def algorithms_dir(self) -> Path:
return self.auxillary_datasets_dir / "algorithms"

@property
def artifact_download_methods(self) -> List[Callable]:
return [self._download_modules, self._download_policies]

@serialize
def _extract_data(self, redo: bool = False) -> None:
"""
Expand All @@ -59,64 +75,37 @@ def _extract_data(self, redo: bool = False) -> None:
for keyword, cert in keywords:
self.certs[cert.dgst].pdf_data.keywords = keyword

def download_all_artifacts(self, cert_ids: Optional[Set[str]] = None) -> None:
"""
Downloads all pdf files related to the certificates specified with cert_ids.
def _download_modules(self, fresh: bool = True) -> None:
self.module_dir.mkdir(exist_ok=True)

:param Optional[Set[str]] cert_ids: cert_ids to download the pdfs foor, defaults to None
:raises RuntimeError: If no cert_ids are specified, raises.
"""
# TODO: The code below was migrated here from get_certs_web()
# self.policies_dir.mkdir(exist_ok=True)
# self.algorithms_dir.mkdir(exist_ok=True)
# logger.info("Downloading certificate html and security policies")
# self._download_all_htmls(cert_ids)
# self.download_all_pdfs(cert_ids)
# self.web_scan(cert_ids, redo=redo_web_scan, update_json=False)

sp_paths, sp_urls = [], []
self.policies_dir.mkdir(exist_ok=True)
if cert_ids is None:
raise RuntimeError("You need to provide cert ids to FIPS download PDFs functionality.")
for cert_id in cert_ids:
if not (self.policies_dir / f"{cert_id}.pdf").exists() or (
fips_dgst(cert_id) in self.certs and not self.certs[fips_dgst(cert_id)].state.txt_state
):
sp_urls.append(constants.FIPS_SP_URL.format(cert_id))
sp_paths.append(self.policies_dir / f"{cert_id}.pdf")
logger.info(f"downloading {len(sp_urls)} module pdf files")
if fresh:
logger.info("Downloading HTML cryptographic modules.")
else:
logger.info("Attempting re-download of failed HTML cryptographic modules.")

certs_to_process = [x for x in self if x.state.module_is_ok_to_download(fresh)]
cert_processing.process_parallel(
FIPSCertificate.download_security_policy,
list(zip(sp_urls, sp_paths)),
FIPSCertificate.download_module,
certs_to_process,
config.n_threads,
progress_bar_desc="Downloading PDF files",
progress_bar_desc="Downloading HTML modules",
)

def _download_all_htmls(self, cert_ids: Set[str]) -> None:
html_paths, html_urls = [], []
self.web_dir.mkdir(exist_ok=True)
for cert_id in cert_ids:
if not (self.web_dir / f"{cert_id}.html").exists():
html_urls.append(constants.FIPS_MODULE_URL.format(cert_id))
html_paths.append(self.web_dir / f"{cert_id}.html")

logger.info(f"downloading {len(html_urls)} module html files")
failed = cert_processing.process_parallel(
FIPSCertificate.download_html_page,
list(zip(html_urls, html_paths)),
def _download_policies(self, fresh: bool = True) -> None:
self.policies_pdf_dir.mkdir(exist_ok=True)

if fresh:
logger.info("Downloading PDF security policies.")
else:
logger.info("Attempting re-download of failed PDF security policies.")

certs_to_process = [x for x in self if x.state.policy_is_ok_to_download(fresh)]
cert_processing.process_parallel(
FIPSCertificate.download_policy,
certs_to_process,
config.n_threads,
progress_bar_desc="Downloading HTML files",
progress_bar_desc="Downloading PDF security policies",
)
failed = [c for c in failed if c]

if len(failed) != 0:
logger.info(f"Download failed for {len(failed)} files. Retrying...")
cert_processing.process_parallel(
FIPSCertificate.download_html_page,
failed,
config.n_threads,
progress_bar_desc="Downloading HTML files again",
)

@serialize
def convert_all_pdfs(self) -> None:
Expand Down Expand Up @@ -196,7 +185,7 @@ def from_web_latest(cls) -> "FIPSDataset":
def _set_local_paths(self) -> None:
cert: FIPSCertificate
for cert in self.certs.values():
cert.set_local_paths(self.policies_dir, self.web_dir)
cert.set_local_paths(self.policies_pdf_dir, self.policies_txt_dir, self.web_dir)

@serialize
def get_certs_from_web(self, to_download: bool = True, keep_metadata: bool = True) -> None:
Expand All @@ -221,9 +210,8 @@ def process_auxillary_datasets(self) -> None:

def _process_algorithms(self):
logger.info("Processing FIPS algorithms.")
self.algorithms = FIPSAlgorithmDataset(
{}, Path(self.root_dir / "web" / "algorithms"), "algorithms", "sample algs"
)
self.algorithms_dir.mkdir(parernts=True, exist_ok=True)
self.algorithms = FIPSAlgorithmDataset({}, self.algorithms_dir, "algorithms", "sample algs")
self.algorithms.get_certs_from_web()
logger.info(f"Finished parsing. Have algorithm dataset with {len(self.algorithms)} algorithm numbers.")

Expand Down
Loading

0 comments on commit 9433658

Please sign in to comment.