diff --git a/mdto/__init__.py b/mdto/__init__.py index 7685125..832951b 100644 --- a/mdto/__init__.py +++ b/mdto/__init__.py @@ -1,3 +1,2 @@ -from .mdto import * -from .mdto import _pronominfo_fido, _pronominfo_siegfried - +from .classes import * +from .utilities import * diff --git a/mdto/mdto.py b/mdto/classes.py similarity index 56% rename from mdto/mdto.py rename to mdto/classes.py index c1af431..ccdc97d 100755 --- a/mdto/mdto.py +++ b/mdto/classes.py @@ -1,39 +1,19 @@ import dataclasses -import hashlib -import json -import logging -import os -import shutil -import subprocess from dataclasses import dataclass -from datetime import datetime -from pathlib import Path from typing import Any, List, TextIO, Union, get_args, get_origin -# allow running mdto.py directly from interpreter: +# allow running directly from interpreter: try: from . import helpers except ImportError: import helpers import lxml.etree as ET -import validators # globals MDTO_MAX_NAAM_LENGTH = 80 -# setup logging -logging.basicConfig( - format="[%(asctime)s] %(levelname)s: %(message)s", datefmt="%H:%M:%S" -) -logging.addLevelName( - # colorize warning messages - logging.WARNING, - "\033[1;33m%s\033[1;0m" % logging.getLevelName(logging.WARNING), -) - - class ValidationError(TypeError): """Custom formatter for MDTO validation errors""" @@ -188,17 +168,17 @@ class VerwijzingGegevens(Serializable): verwijzingIdentificatie (Optional[IdentificatieGegevens]): Identificatie van object waarnaar verwezen wordt """ + verwijzingNaam: str + verwijzingIdentificatie: IdentificatieGegevens = None + def validate(self): """Warn about long names.""" super().validate() if len(self.verwijzingNaam) > MDTO_MAX_NAAM_LENGTH: - logging.warning( + helpers.logging.warning( f"VerwijzingGegevens.verwijzingNaam: {self.verwijzingNaam} exceeds recommended length of {MDTO_MAX_NAAM_LENGTH}" ) - verwijzingNaam: str - verwijzingIdentificatie: IdentificatieGegevens = None - @dataclass class BegripGegevens(Serializable): @@ -445,7 +425,7 @@ def validate(self): """Warn about long names.""" super().validate() if len(self.naam) > MDTO_MAX_NAAM_LENGTH: - logging.warning( + helpers.logging.warning( f"{self.__class__.__name__}.naam: {self.naam} exceeds recommended length of {MDTO_MAX_NAAM_LENGTH}" ) @@ -663,523 +643,3 @@ def validate(self) -> None: ["bestand", "URLBestand"], f"url {self.URLBestand} is malformed", ) - - -def _pronominfo_fido(file: str | Path) -> BegripGegevens: - # Note: fido currently lacks a public API - # Hence, the most robust solution is to invoke fido as a cli program - # Upstream issue: https://github.com/openpreserve/fido/issues/94 - # FIXME: log more warnings from fido? - cmd = [ - "fido", - "-q", - "-matchprintf", - "OK,%(info.formatname)s,%(info.puid)s,\n", - "-nomatchprintf", - "FAIL", - file, - ] - - result = subprocess.run( - cmd, capture_output=True, shell=False, text=True, check=True - ) - - stdout, stderr = result.stdout, result.stderr - - # fido prints warnings about empty files to stderr - if "(empty)" in stderr.lower(): - logging.warning(f"{file} appears to be an empty file") - - # found a match! - if stdout.startswith("OK"): - matches = stdout.rstrip().split("\n") - if len(matches) > 1: - logging.warning( - "fido returned more than one PRONOM match " - f"for {file}. Selecting the first one." - ) - - # strip "OK" from the output - matches = matches[0].split(",")[1:] - verwijzing = VerwijzingGegevens(verwijzingNaam="PRONOM-register") - return BegripGegevens( - begripLabel=matches[0], - begripCode=matches[1], - begripBegrippenlijst=verwijzing, - ) - else: - raise RuntimeError(f"fido PRONOM detection failed on {file}") - - -def _pronominfo_siegfried(file: str | Path) -> BegripGegevens: - cmd = ["sf", "--json", "--sym", file] - - result = subprocess.run( - cmd, capture_output=True, shell=False, text=True, check=True - ) - - # extract info about first file, since only one file is being passed to sf - sf_json = json.loads(result.stdout)["files"][0] - - if "empty" in sf_json["errors"]: - logging.warning(f"{file} appears to be an empty file") - - # extract match - matches = sf_json["matches"] - if len(matches) > 1: - logging.warning( - "siegfried returned more than one PRONOM match " - f"for {file}. Selecting the first one." - ) - match = matches[0] - - # check if a match was found - if match["id"] == "UNKNOWN": - raise RuntimeError( - f"siegfried failed to detect PRONOM information about {file}" - ) - - # log sf's warnings (such as extension mismatches) - warning = match["warning"] - if warning: - logging.warning(f"siegfried reports PRONOM warning about {file}: {warning}") - - return BegripGegevens( - begripLabel=match["format"], - begripCode=match["id"], - begripBegrippenlijst=VerwijzingGegevens("PRONOM-register"), - ) - - -def pronominfo(file: str | Path) -> BegripGegevens: - """Generate PRONOM information about `file`. This information can be used in - a Bestand's `` tag. - - mdto.py supports two backends for PRONOM detection: fido and sf - (siegfried). The default backend is sf; unless sf is not found, in which - case fido is used as an automatic fallback. Set the environment variable - `PRONOM_BACKEND` to fido/siegfried to manually select a backend - (e.g. `PRONOM_BACKEND=fido your_script.py ...`). - - Args: - file (str | Path): Path to the file to inspect - - Returns: - BegripGegevens: Object with the following attributes: - - `begripLabel`: The file's PRONOM signature name - - `begripCode`: The file's PRONOM ID - - `begripBegrippenLijst`: A reference to the PRONOM registry - """ - # check if file exists and is indeed a file (as opposed to a directory) - if not os.path.isfile(file): - raise TypeError(f"File '{file}' does not exist or might be a directory") - - siegfried_found = shutil.which("sf") - fido_found = shutil.which("fido") - pronom_backend = os.environ.get("PRONOM_BACKEND", None) - - if pronom_backend is not None and pronom_backend not in ("fido", "siegfried", "sf"): - raise ValueError( - f"invalid PRONOM backend '{pronom_backend}' specified in PRONOM_BACKEND. " - "Valid options are 'fido' or 'sf'" - ) - - # If PRONOM_BACKEND is not set, default to siegfried, unless siegfried is not found. - # In that case, fallback to fido. - if pronom_backend is None: - if siegfried_found: - pronom_backend = "sf" - elif fido_found: - pronom_backend = "fido" - else: - raise RuntimeError( - "Neither 'fido' nor 'sf' (siegfried) appear to be installed. " - "At least one of these program is required for PRONOM detection. " - "For installation instructions, " - "see https://github.com/openpreserve/fido#installation (fido) " - "or https://github.com/richardlehane/siegfried#install (siegfried)" - ) - - if pronom_backend in ("sf", "siegfried"): - if not siegfried_found: - raise RuntimeError( - "Program 'sf' (siegfried) not found. " - "For installation instructions, see https://github.com/richardlehane/siegfried#install" - ) - # log choice? - return _pronominfo_siegfried(file) - - elif pronom_backend == "fido": - if not fido_found: - raise RuntimeError( - "Program 'fido' not found. " - "For installation instructions, see https://github.com/openpreserve/fido#installation" - ) - # log choice? - return _pronominfo_fido(file) - - -def _detect_verwijzing(informatieobject: TextIO | str) -> VerwijzingGegevens: - """A Bestand object must contain a reference to a corresponding - informatieobject. Specifically, it expects an tag with - the following children: - - 1. : name of the informatieobject - 2. (optional): reference to the informatieobject's - ID and source thereof - - This function infers these so-called 'VerwijzingGegevens' by parsing the XML - of the file `informatieobject`. - - Args: - informatieobject (TextIO | str): XML file to infer VerwijzingGegevens from - - Returns: - VerwijzingGegevens: reference to the informatieobject specified by `informatieobject` - """ - - id_gegevens = None - namespaces = {"mdto": "https://www.nationaalarchief.nl/mdto"} - tree = ET.parse(informatieobject) - root = tree.getroot() - - id_xpath = ".//mdto:informatieobject/mdto:identificatie/" - - kenmerk = root.find(f"{id_xpath}mdto:identificatieKenmerk", namespaces=namespaces) - bron = root.find(f"{id_xpath}mdto:identificatieBron", namespaces=namespaces) - naam = root.find(".//mdto:informatieobject/mdto:naam", namespaces=namespaces) - - if None in [kenmerk, bron]: - raise ValueError(f"Failed to detect in {informatieobject}") - - identificatie = IdentificatieGegevens(kenmerk.text, bron.text) - - if naam is None: - raise ValueError(f"Failed to detect in {informatieobject}") - - return VerwijzingGegevens(naam.text, identificatie) - - -def bestand_from_file( - file: TextIO | str, - identificatie: IdentificatieGegevens | List[IdentificatieGegevens], - isrepresentatievan: VerwijzingGegevens | TextIO | str, - url: str = None, -) -> Bestand: - """Convenience function for creating a Bestand object from a file. - - This function differs from calling Bestand() directly in that it - infers most technical information for you (checksum, PRONOM info, - etc.) by inspecting `file`. The value of , for example, is - always set to the basename of `file`. - - - Args: - file (TextIO | str): the file the Bestand object represents - identificatie (IdentificatieGegevens | List[IdentificatieGegevens]): identificatiekenmerk of - Bestand object - isrepresentatievan (TextIO | str | VerwijzingGegevens): a XML - file containing an informatieobject, or a - VerwijzingGegevens referencing an informatieobject. - Used to construct the values for . - url (Optional[str]): value of - - Example: - ```python - - verwijzing_obj = VerwijzingGegevens("vergunning.mdto.xml") - bestand = mdto.bestand_from_file( - "vergunning.pdf", - IdentificatieGegevens('34c5-4379-9f1a-5c378', 'Proza (DMS)'), - isrepresentatievan=verwijzing_obj # or pass the actual file - ) - bestand.save("vergunning.bestand.mdto.xml") - ``` - - Returns: - Bestand: new Bestand object - """ - file = helpers.process_file(file) - - # set to basename - naam = os.path.basename(file.name) - - omvang = os.path.getsize(file.name) - bestandsformaat = pronominfo(file.name) - checksum = create_checksum(file) - - # file or file path? - if isinstance(isrepresentatievan, (str, Path)) or hasattr( - isrepresentatievan, "read" - ): - informatieobject_file = helpers.process_file(isrepresentatievan) - # Construct verwijzing from informatieobject file - verwijzing_obj = _detect_verwijzing(informatieobject_file) - informatieobject_file.close() - elif isinstance(isrepresentatievan, VerwijzingGegevens): - verwijzing_obj = isrepresentatievan - else: - raise TypeError( - "isrepresentatievan must either be a path/file, or a VerwijzingGegevens object." - ) - - file.close() - - return Bestand( - identificatie, naam, omvang, bestandsformaat, checksum, verwijzing_obj, url - ) - - -def create_checksum( - file_or_filename: TextIO | str, algorithm: str = "sha256" -) -> ChecksumGegevens: - """Convience function for creating ChecksumGegegevens objects. - - Takes a file-like object or path to file, and then generates the requisite - checksum metadata (i.e. `checksumAlgoritme`, `checksumWaarde`, and - `checksumDatum`) from that file. - - Example: - - ```python - pdf_checksum = create_checksum('document.pdf') - # create ChecksumGegevens with a 512 bits instead of a 256 bits checksum - jpg_checksum = create_checksum('scan-003.jpg', algorithm="sha512") - ``` - - Args: - infile (TextIO | str): file-like object to generate checksum data for - algorithm (Optional[str]): checksum algorithm to use; defaults to sha256. - For valid values, see https://docs.python.org/3/library/hashlib.html - - Returns: - ChecksumGegevens: checksum metadata from `file_or_filename` - """ - infile = helpers.process_file(file_or_filename) - verwijzing = VerwijzingGegevens( - verwijzingNaam="Begrippenlijst ChecksumAlgoritme MDTO" - ) - - checksumAlgoritme = BegripGegevens( - begripLabel=algorithm.upper().replace("SHA", "SHA-"), - begripBegrippenlijst=verwijzing, - ) - - # file_digest() expects a file in binary mode, hence `infile.buffer.raw` - checksumWaarde = hashlib.file_digest(infile.buffer.raw, algorithm).hexdigest() - - checksumDatum = datetime.now().strftime("%Y-%m-%dT%H:%M:%S") - - return ChecksumGegevens(checksumAlgoritme, checksumWaarde, checksumDatum) - - -def from_xml(mdto_xml: TextIO | str) -> Object: - """Construct a Informatieobject/Bestand object from a MDTO XML file. - - Note: - When `mdto_xml` is invalid MDTO, this function will probably throw an error. - - Example: - - ```python - import mdto - - # read informatieobject from file - informatieobject = mdto.from_xml("Voorbeeld Archiefstuk Informatieobject.xml") - - # edit the informatie object - informatieobject.naam = "Verlenen kapvergunning Flipje's Erf 15 Tiel" - - # override the original informatieobject XML - informatieobject.save("Voorbeeld Archiefstuk Informatieobject.xml") - ``` - - Args: - mdto_xml (TextIO | str): The MDTO XML file to construct an Informatieobject/Bestand from - - Returns: - Object: A new MDTO object (Bestand or Informatieobject) - """ - - # Parsers: - def parse_text(node) -> str: - return node.text - - def parse_int(node) -> int: - return int(node.text) - - def parse_identificatie(node) -> IdentificatieGegevens: - return IdentificatieGegevens( - node[0].text, - node[1].text, - ) - - # this is measurably faster than the elem_to_mdto variant - def parse_verwijzing(node) -> VerwijzingGegevens: - if len(node) == 1: - return VerwijzingGegevens(node[0].text) - else: - return VerwijzingGegevens( - node[0].text, - parse_identificatie(node[1]), - ) - - # FIXME: return value - def elem_to_mdto(elem: ET.Element, mdto_class: classmethod, mdto_xml_parsers: dict): - """Initialize MDTO class (TermijnGegevens, EventGegevens, etc.) with values - from a given XML node, using parsers specified in `mdto_xml_parsers`. - - Returns: - MDTO instance: a initialized MDTO instance of `mdto_class` - """ - # initialize dictionary of keyword arguments (to be passed to MDTO class constructor) - constructor_args = {mdto_field: [] for mdto_field in mdto_xml_parsers} - - for child in elem: - mdto_field = child.tag.removeprefix( - "{https://www.nationaalarchief.nl/mdto}" - ) - # retrieve parser - xml_parser = mdto_xml_parsers[mdto_field] - # add value of parsed child element to class constructor args - constructor_args[mdto_field].append(xml_parser(child)) - - # cleanup class constructor arguments - for argname, value in constructor_args.items(): - # Replace empty argument lists by None - if len(value) == 0: - constructor_args[argname] = None - # Replace one-itemed argument lists by their respective item - elif len(value) == 1: - constructor_args[argname] = value[0] - - return mdto_class(**constructor_args) - - begrip_parsers = { - "begripLabel": parse_text, - "begripCode": parse_text, - "begripBegrippenlijst": parse_verwijzing, - } - parse_begrip = lambda e: elem_to_mdto(e, BegripGegevens, begrip_parsers) - - termijn_parsers = { - "termijnTriggerStartLooptijd": parse_begrip, - "termijnStartdatumLooptijd": parse_text, - "termijnLooptijd": parse_text, - "termijnEinddatum": parse_text, - } - parse_termijn = lambda e: elem_to_mdto(e, TermijnGegevens, termijn_parsers) - - beperking_parsers = { - "beperkingGebruikType": parse_begrip, - "beperkingGebruikNadereBeschrijving": parse_text, - "beperkingGebruikDocumentatie": parse_verwijzing, - "beperkingGebruikTermijn": parse_termijn, - } - parse_beperking = lambda e: elem_to_mdto( - e, BeperkingGebruikGegevens, beperking_parsers - ) - - raadpleeglocatie_parsers = { - "raadpleeglocatieFysiek": parse_verwijzing, - "raadpleeglocatieOnline": parse_text, - } - parse_raadpleeglocatie = lambda e: elem_to_mdto( - e, RaadpleeglocatieGegevens, raadpleeglocatie_parsers - ) - - dekking_in_tijd_parsers = { - "dekkingInTijdType": parse_begrip, - "dekkingInTijdBegindatum": parse_text, - "dekkingInTijdEinddatum": parse_text, - } - parse_dekking_in_tijd = lambda e: elem_to_mdto( - e, DekkingInTijdGegevens, dekking_in_tijd_parsers - ) - - event_parsers = { - "eventType": parse_begrip, - "eventTijd": parse_text, - "eventVerantwoordelijkeActor": parse_verwijzing, - "eventResultaat": parse_text, - } - parse_event = lambda e: elem_to_mdto(e, EventGegevens, event_parsers) - - gerelateerd_informatieobject_parsers = { - "gerelateerdInformatieobjectVerwijzing": parse_verwijzing, - "gerelateerdInformatieobjectTypeRelatie": parse_begrip, - } - parse_gerelateerd_informatieobject = lambda e: elem_to_mdto( - e, GerelateerdInformatieobjectGegevens, gerelateerd_informatieobject_parsers - ) - - betrokkene_parsers = { - "betrokkeneTypeRelatie": parse_begrip, - "betrokkeneActor": parse_verwijzing, - } - parse_betrokkene = lambda e: elem_to_mdto(e, BetrokkeneGegevens, betrokkene_parsers) - - checksum_parsers = { - "checksumAlgoritme": parse_begrip, - "checksumWaarde": parse_text, - "checksumDatum": parse_text, - } - parse_checksum = lambda e: elem_to_mdto(e, ChecksumGegevens, checksum_parsers) - - informatieobject_parsers = { - "naam": parse_text, - "identificatie": parse_identificatie, - "aggregatieniveau": parse_begrip, - "classificatie": parse_begrip, - "trefwoord": parse_text, - "omschrijving": parse_text, - "raadpleeglocatie": parse_raadpleeglocatie, - "dekkingInTijd": parse_dekking_in_tijd, - "dekkingInRuimte": parse_verwijzing, - "taal": parse_text, - "event": parse_event, - "waardering": parse_begrip, - "bewaartermijn": parse_termijn, - "informatiecategorie": parse_begrip, - "isOnderdeelVan": parse_verwijzing, - "bevatOnderdeel": parse_verwijzing, - "heeftRepresentatie": parse_verwijzing, - "aanvullendeMetagegevens": parse_verwijzing, - "gerelateerdInformatieobject": parse_gerelateerd_informatieobject, - "archiefvormer": parse_verwijzing, - "betrokkene": parse_betrokkene, - "activiteit": parse_verwijzing, - "beperkingGebruik": parse_beperking, - } - parse_informatieobject = lambda e: elem_to_mdto( - e, Informatieobject, informatieobject_parsers - ) - - bestand_parsers = { - "naam": parse_text, - "identificatie": parse_identificatie, - "omvang": parse_int, - "checksum": parse_checksum, - "bestandsformaat": parse_begrip, - "URLBestand": parse_text, - "isRepresentatieVan": parse_verwijzing, - } - parse_bestand = lambda e: elem_to_mdto(e, Bestand, bestand_parsers) - - # read xmlfile - tree = ET.parse(mdto_xml) - root = tree.getroot() - children = list(root[0]) - - # check if object type is Bestand or Informatieobject - object_type = root[0].tag.removeprefix("{https://www.nationaalarchief.nl/mdto}") - - if object_type == "informatieobject": - return parse_informatieobject(children) - elif object_type == "bestand": - return parse_bestand(children) - else: - raise ValueError( - f"Unexpected first child <{object_type}> in {mdto_xml}: " - "expected or ." - ) diff --git a/mdto/gegevensgroepen.py b/mdto/gegevensgroepen.py index e9a665f..b5fdaba 100644 --- a/mdto/gegevensgroepen.py +++ b/mdto/gegevensgroepen.py @@ -1,4 +1,4 @@ -from .mdto import ( +from .classes import ( IdentificatieGegevens, VerwijzingGegevens, BegripGegevens, @@ -14,7 +14,7 @@ Bestand, ) -# Expose desired classes +# Expose public classes __all__ = [ "IdentificatieGegevens", "VerwijzingGegevens", diff --git a/mdto/helpers.py b/mdto/helpers.py index f683ae2..e0f978d 100644 --- a/mdto/helpers.py +++ b/mdto/helpers.py @@ -1,9 +1,20 @@ # Private helper methods from pathlib import Path from typing import List, TextIO +import logging import validators +# setup logging +logging.basicConfig( + format="[%(asctime)s] %(levelname)s: %(message)s", datefmt="%H:%M:%S" +) +logging.addLevelName( + # colorize warning messages + logging.WARNING, + "\033[1;33m%s\033[1;0m" % logging.getLevelName(logging.WARNING), +) + def process_file(file_or_filename) -> TextIO: """Return file-object if input is already a file. diff --git a/mdto/utilities.py b/mdto/utilities.py new file mode 100644 index 0000000..6ec2fba --- /dev/null +++ b/mdto/utilities.py @@ -0,0 +1,534 @@ +import hashlib +import json +import os +import shutil +import subprocess +from datetime import datetime +from pathlib import Path +from typing import List, TextIO + +import lxml.etree as ET + +from mdto.gegevensgroepen import * + +from . import helpers + + +def _pronominfo_fido(file: str | Path) -> BegripGegevens: + # Note: fido currently lacks a public API + # Hence, the most robust solution is to invoke fido as a cli program + # Upstream issue: https://github.com/openpreserve/fido/issues/94 + # FIXME: log more warnings from fido? + cmd = [ + "fido", + "-q", + "-matchprintf", + "OK,%(info.formatname)s,%(info.puid)s,\n", + "-nomatchprintf", + "FAIL", + file, + ] + + result = subprocess.run( + cmd, capture_output=True, shell=False, text=True, check=True + ) + + stdout, stderr = result.stdout, result.stderr + + # fido prints warnings about empty files to stderr + if "(empty)" in stderr.lower(): + helpers.logging.warning(f"{file} appears to be an empty file") + + # found a match! + if stdout.startswith("OK"): + matches = stdout.rstrip().split("\n") + if len(matches) > 1: + helpers.logging.warning( + "fido returned more than one PRONOM match " + f"for {file}. Selecting the first one." + ) + + # strip "OK" from the output + matches = matches[0].split(",")[1:] + verwijzing = VerwijzingGegevens(verwijzingNaam="PRONOM-register") + return BegripGegevens( + begripLabel=matches[0], + begripCode=matches[1], + begripBegrippenlijst=verwijzing, + ) + else: + raise RuntimeError(f"fido PRONOM detection failed on {file}") + + +def _pronominfo_siegfried(file: str | Path) -> BegripGegevens: + cmd = ["sf", "--json", "--sym", file] + + result = subprocess.run( + cmd, capture_output=True, shell=False, text=True, check=True + ) + + # extract info about first file, since only one file is being passed to sf + sf_json = json.loads(result.stdout)["files"][0] + + if "empty" in sf_json["errors"]: + helpers.logging.warning(f"{file} appears to be an empty file") + + # extract match + matches = sf_json["matches"] + if len(matches) > 1: + helpers.logging.warning( + "siegfried returned more than one PRONOM match " + f"for {file}. Selecting the first one." + ) + match = matches[0] + + # check if a match was found + if match["id"] == "UNKNOWN": + raise RuntimeError( + f"siegfried failed to detect PRONOM information about {file}" + ) + + # log sf's warnings (such as extension mismatches) + warning = match["warning"] + if warning: + helpers.logging.warning(f"siegfried reports PRONOM warning about {file}: {warning}") + + return BegripGegevens( + begripLabel=match["format"], + begripCode=match["id"], + begripBegrippenlijst=VerwijzingGegevens("PRONOM-register"), + ) + + +def pronominfo(file: str | Path) -> BegripGegevens: + """Generate PRONOM information about `file`. This information can be used in + a Bestand's `` tag. + + mdto.py supports two backends for PRONOM detection: fido and sf + (siegfried). The default backend is sf; unless sf is not found, in which + case fido is used as an automatic fallback. Set the environment variable + `PRONOM_BACKEND` to fido/siegfried to manually select a backend + (e.g. `PRONOM_BACKEND=fido your_script.py ...`). + + Args: + file (str | Path): Path to the file to inspect + + Returns: + BegripGegevens: Object with the following attributes: + - `begripLabel`: The file's PRONOM signature name + - `begripCode`: The file's PRONOM ID + - `begripBegrippenLijst`: A reference to the PRONOM registry + """ + # check if file exists and is indeed a file (as opposed to a directory) + if not os.path.isfile(file): + raise TypeError(f"File '{file}' does not exist or might be a directory") + + siegfried_found = shutil.which("sf") + fido_found = shutil.which("fido") + pronom_backend = os.environ.get("PRONOM_BACKEND", None) + + if pronom_backend is not None and pronom_backend not in ("fido", "siegfried", "sf"): + raise ValueError( + f"invalid PRONOM backend '{pronom_backend}' specified in PRONOM_BACKEND. " + "Valid options are 'fido' or 'sf'" + ) + + # If PRONOM_BACKEND is not set, default to siegfried, unless siegfried is not found. + # In that case, fallback to fido. + if pronom_backend is None: + if siegfried_found: + pronom_backend = "sf" + elif fido_found: + pronom_backend = "fido" + else: + raise RuntimeError( + "Neither 'fido' nor 'sf' (siegfried) appear to be installed. " + "At least one of these program is required for PRONOM detection. " + "For installation instructions, " + "see https://github.com/openpreserve/fido#installation (fido) " + "or https://github.com/richardlehane/siegfried#install (siegfried)" + ) + + if pronom_backend in ("sf", "siegfried"): + if not siegfried_found: + raise RuntimeError( + "Program 'sf' (siegfried) not found. " + "For installation instructions, see https://github.com/richardlehane/siegfried#install" + ) + # log choice? + return _pronominfo_siegfried(file) + + elif pronom_backend == "fido": + if not fido_found: + raise RuntimeError( + "Program 'fido' not found. " + "For installation instructions, see https://github.com/openpreserve/fido#installation" + ) + # log choice? + return _pronominfo_fido(file) + + +def _detect_verwijzing(informatieobject: TextIO | str) -> VerwijzingGegevens: + """A Bestand object must contain a reference to a corresponding + informatieobject. Specifically, it expects an tag with + the following children: + + 1. : name of the informatieobject + 2. (optional): reference to the informatieobject's + ID and source thereof + + This function infers these so-called 'VerwijzingGegevens' by parsing the XML + of the file `informatieobject`. + + Args: + informatieobject (TextIO | str): XML file to infer VerwijzingGegevens from + + Returns: + VerwijzingGegevens: reference to the informatieobject specified by `informatieobject` + """ + + id_gegevens = None + namespaces = {"mdto": "https://www.nationaalarchief.nl/mdto"} + tree = ET.parse(informatieobject) + root = tree.getroot() + + id_xpath = ".//mdto:informatieobject/mdto:identificatie/" + + kenmerk = root.find(f"{id_xpath}mdto:identificatieKenmerk", namespaces=namespaces) + bron = root.find(f"{id_xpath}mdto:identificatieBron", namespaces=namespaces) + naam = root.find(".//mdto:informatieobject/mdto:naam", namespaces=namespaces) + + if None in [kenmerk, bron]: + raise ValueError(f"Failed to detect in {informatieobject}") + + identificatie = IdentificatieGegevens(kenmerk.text, bron.text) + + if naam is None: + raise ValueError(f"Failed to detect in {informatieobject}") + + return VerwijzingGegevens(naam.text, identificatie) + + +def bestand_from_file( + file: TextIO | str, + identificatie: IdentificatieGegevens | List[IdentificatieGegevens], + isrepresentatievan: VerwijzingGegevens | TextIO | str, + url: str = None, +) -> Bestand: + """Convenience function for creating a Bestand object from a file. + + This function differs from calling Bestand() directly in that it + infers most technical information for you (checksum, PRONOM info, + etc.) by inspecting `file`. The value of , for example, is + always set to the basename of `file`. + + + Args: + file (TextIO | str): the file the Bestand object represents + identificatie (IdentificatieGegevens | List[IdentificatieGegevens]): + identificatiekenmerk of Bestand object + isrepresentatievan (TextIO | str | VerwijzingGegevens): a XML + file containing an informatieobject, or a + VerwijzingGegevens referencing an informatieobject. + Used to construct the values for . + url (Optional[str]): value of + + Example: + ```python + + verwijzing_obj = VerwijzingGegevens("vergunning.mdto.xml") + bestand = mdto.bestand_from_file( + "vergunning.pdf", + IdentificatieGegevens('34c5-4379-9f1a-5c378', 'Proza (DMS)'), + isrepresentatievan=verwijzing_obj # or pass the actual file + ) + bestand.save("vergunning.bestand.mdto.xml") + ``` + + Returns: + Bestand: new Bestand object + """ + file = helpers.process_file(file) + + # set to basename + naam = os.path.basename(file.name) + + omvang = os.path.getsize(file.name) + bestandsformaat = pronominfo(file.name) + checksum = create_checksum(file) + + # file or file path? + if isinstance(isrepresentatievan, (str, Path)) or hasattr( + isrepresentatievan, "read" + ): + informatieobject_file = helpers.process_file(isrepresentatievan) + # Construct verwijzing from informatieobject file + verwijzing_obj = _detect_verwijzing(informatieobject_file) + informatieobject_file.close() + elif isinstance(isrepresentatievan, VerwijzingGegevens): + verwijzing_obj = isrepresentatievan + else: + raise TypeError( + "isrepresentatievan must either be a path/file, or a VerwijzingGegevens object." + ) + + file.close() + + return Bestand( + identificatie, naam, omvang, bestandsformaat, checksum, verwijzing_obj, url + ) + + +def create_checksum( + file_or_filename: TextIO | str, algorithm: str = "sha256" +) -> ChecksumGegevens: + """Convience function for creating ChecksumGegegevens objects. + + Takes a file-like object or path to file, and then generates the requisite + checksum metadata (i.e. `checksumAlgoritme`, `checksumWaarde`, and + `checksumDatum`) from that file. + + Example: + + ```python + pdf_checksum = create_checksum('document.pdf') + # create ChecksumGegevens with a 512 bits instead of a 256 bits checksum + jpg_checksum = create_checksum('scan-003.jpg', algorithm="sha512") + ``` + + Args: + infile (TextIO | str): file-like object to generate checksum data for + algorithm (Optional[str]): checksum algorithm to use; defaults to sha256. + For valid values, see https://docs.python.org/3/library/hashlib.html + + Returns: + ChecksumGegevens: checksum metadata from `file_or_filename` + """ + infile = helpers.process_file(file_or_filename) + verwijzing = VerwijzingGegevens( + verwijzingNaam="Begrippenlijst ChecksumAlgoritme MDTO" + ) + + checksumAlgoritme = BegripGegevens( + begripLabel=algorithm.upper().replace("SHA", "SHA-"), + begripBegrippenlijst=verwijzing, + ) + + # file_digest() expects a file in binary mode, hence `infile.buffer.raw` + checksumWaarde = hashlib.file_digest(infile.buffer.raw, algorithm).hexdigest() + + checksumDatum = datetime.now().strftime("%Y-%m-%dT%H:%M:%S") + + return ChecksumGegevens(checksumAlgoritme, checksumWaarde, checksumDatum) + + +def from_xml(mdto_xml: TextIO | str) -> Informatieobject | Bestand: + """Construct a Informatieobject/Bestand object from a MDTO XML file. + + Note: + When `mdto_xml` is invalid MDTO, this function will probably throw an error. + + Example: + + ```python + import mdto + + # read informatieobject from file + informatieobject = mdto.from_xml("Voorbeeld Archiefstuk Informatieobject.xml") + + # edit the informatie object + informatieobject.naam = "Verlenen kapvergunning Flipje's Erf 15 Tiel" + + # override the original informatieobject XML + informatieobject.save("Voorbeeld Archiefstuk Informatieobject.xml") + ``` + + Args: + mdto_xml (TextIO | str): The MDTO XML file to construct an Informatieobject/Bestand from + + Returns: + Bestand | Informatieobject: A new MDTO object + """ + + # Parsers: + def parse_text(node) -> str: + return node.text + + def parse_int(node) -> int: + return int(node.text) + + def parse_identificatie(node) -> IdentificatieGegevens: + return IdentificatieGegevens( + node[0].text, + node[1].text, + ) + + # this is measurably faster than the elem_to_mdto variant + def parse_verwijzing(node) -> VerwijzingGegevens: + if len(node) == 1: + return VerwijzingGegevens(node[0].text) + else: + return VerwijzingGegevens( + node[0].text, + parse_identificatie(node[1]), + ) + + # FIXME: return value + def elem_to_mdto(elem: ET.Element, mdto_class: classmethod, mdto_xml_parsers: dict): + """Initialize MDTO class (TermijnGegevens, EventGegevens, etc.) with values + from a given XML node, using parsers specified in `mdto_xml_parsers`. + + Returns: + MDTO instance: a initialized MDTO instance of `mdto_class` + """ + # initialize dictionary of keyword arguments (to be passed to MDTO class constructor) + constructor_args = {mdto_field: [] for mdto_field in mdto_xml_parsers} + + for child in elem: + mdto_field = child.tag.removeprefix( + "{https://www.nationaalarchief.nl/mdto}" + ) + # retrieve parser + xml_parser = mdto_xml_parsers[mdto_field] + # add value of parsed child element to class constructor args + constructor_args[mdto_field].append(xml_parser(child)) + + # cleanup class constructor arguments + for argname, value in constructor_args.items(): + # Replace empty argument lists by None + if len(value) == 0: + constructor_args[argname] = None + # Replace one-itemed argument lists by their respective item + elif len(value) == 1: + constructor_args[argname] = value[0] + + return mdto_class(**constructor_args) + + begrip_parsers = { + "begripLabel": parse_text, + "begripCode": parse_text, + "begripBegrippenlijst": parse_verwijzing, + } + parse_begrip = lambda e: elem_to_mdto(e, BegripGegevens, begrip_parsers) + + termijn_parsers = { + "termijnTriggerStartLooptijd": parse_begrip, + "termijnStartdatumLooptijd": parse_text, + "termijnLooptijd": parse_text, + "termijnEinddatum": parse_text, + } + parse_termijn = lambda e: elem_to_mdto(e, TermijnGegevens, termijn_parsers) + + beperking_parsers = { + "beperkingGebruikType": parse_begrip, + "beperkingGebruikNadereBeschrijving": parse_text, + "beperkingGebruikDocumentatie": parse_verwijzing, + "beperkingGebruikTermijn": parse_termijn, + } + parse_beperking = lambda e: elem_to_mdto( + e, BeperkingGebruikGegevens, beperking_parsers + ) + + raadpleeglocatie_parsers = { + "raadpleeglocatieFysiek": parse_verwijzing, + "raadpleeglocatieOnline": parse_text, + } + parse_raadpleeglocatie = lambda e: elem_to_mdto( + e, RaadpleeglocatieGegevens, raadpleeglocatie_parsers + ) + + dekking_in_tijd_parsers = { + "dekkingInTijdType": parse_begrip, + "dekkingInTijdBegindatum": parse_text, + "dekkingInTijdEinddatum": parse_text, + } + parse_dekking_in_tijd = lambda e: elem_to_mdto( + e, DekkingInTijdGegevens, dekking_in_tijd_parsers + ) + + event_parsers = { + "eventType": parse_begrip, + "eventTijd": parse_text, + "eventVerantwoordelijkeActor": parse_verwijzing, + "eventResultaat": parse_text, + } + parse_event = lambda e: elem_to_mdto(e, EventGegevens, event_parsers) + + gerelateerd_informatieobject_parsers = { + "gerelateerdInformatieobjectVerwijzing": parse_verwijzing, + "gerelateerdInformatieobjectTypeRelatie": parse_begrip, + } + parse_gerelateerd_informatieobject = lambda e: elem_to_mdto( + e, GerelateerdInformatieobjectGegevens, gerelateerd_informatieobject_parsers + ) + + betrokkene_parsers = { + "betrokkeneTypeRelatie": parse_begrip, + "betrokkeneActor": parse_verwijzing, + } + parse_betrokkene = lambda e: elem_to_mdto(e, BetrokkeneGegevens, betrokkene_parsers) + + checksum_parsers = { + "checksumAlgoritme": parse_begrip, + "checksumWaarde": parse_text, + "checksumDatum": parse_text, + } + parse_checksum = lambda e: elem_to_mdto(e, ChecksumGegevens, checksum_parsers) + + informatieobject_parsers = { + "naam": parse_text, + "identificatie": parse_identificatie, + "aggregatieniveau": parse_begrip, + "classificatie": parse_begrip, + "trefwoord": parse_text, + "omschrijving": parse_text, + "raadpleeglocatie": parse_raadpleeglocatie, + "dekkingInTijd": parse_dekking_in_tijd, + "dekkingInRuimte": parse_verwijzing, + "taal": parse_text, + "event": parse_event, + "waardering": parse_begrip, + "bewaartermijn": parse_termijn, + "informatiecategorie": parse_begrip, + "isOnderdeelVan": parse_verwijzing, + "bevatOnderdeel": parse_verwijzing, + "heeftRepresentatie": parse_verwijzing, + "aanvullendeMetagegevens": parse_verwijzing, + "gerelateerdInformatieobject": parse_gerelateerd_informatieobject, + "archiefvormer": parse_verwijzing, + "betrokkene": parse_betrokkene, + "activiteit": parse_verwijzing, + "beperkingGebruik": parse_beperking, + } + parse_informatieobject = lambda e: elem_to_mdto( + e, Informatieobject, informatieobject_parsers + ) + + bestand_parsers = { + "naam": parse_text, + "identificatie": parse_identificatie, + "omvang": parse_int, + "checksum": parse_checksum, + "bestandsformaat": parse_begrip, + "URLBestand": parse_text, + "isRepresentatieVan": parse_verwijzing, + } + parse_bestand = lambda e: elem_to_mdto(e, Bestand, bestand_parsers) + + # read xmlfile + tree = ET.parse(mdto_xml) + root = tree.getroot() + children = list(root[0]) + + # check if object type is Bestand or Informatieobject + object_type = root[0].tag.removeprefix("{https://www.nationaalarchief.nl/mdto}") + + if object_type == "informatieobject": + return parse_informatieobject(children) + elif object_type == "bestand": + return parse_bestand(children) + else: + raise ValueError( + f"Unexpected first child <{object_type}> in {mdto_xml}: " + "expected or ." + ) diff --git a/tests/test_pronom_detection.py b/tests/test_pronom_detection.py index 77d92b6..11f01a1 100644 --- a/tests/test_pronom_detection.py +++ b/tests/test_pronom_detection.py @@ -1,6 +1,7 @@ import pytest -import mdto + from mdto.gegevensgroepen import * +from mdto.utilities import _pronominfo_fido, _pronominfo_siegfried def test_pronom_siegfried(voorbeeld_archiefstuk_xml): @@ -8,7 +9,7 @@ def test_pronom_siegfried(voorbeeld_archiefstuk_xml): expected = BegripGegevens( "Extensible Markup Language", VerwijzingGegevens("PRONOM-register"), "fmt/101" ) - got = mdto._pronominfo_siegfried(voorbeeld_archiefstuk_xml) + got = _pronominfo_siegfried(voorbeeld_archiefstuk_xml) assert expected == got @@ -17,5 +18,5 @@ def test_pronom_fido(voorbeeld_archiefstuk_xml): expected = BegripGegevens( "Extensible Markup Language", VerwijzingGegevens("PRONOM-register"), "fmt/101" ) - got = mdto._pronominfo_fido(voorbeeld_archiefstuk_xml) + got = _pronominfo_fido(voorbeeld_archiefstuk_xml) assert expected == got