Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

XML generation and parsing #43

Open
wants to merge 20 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions cimgen/cimgen.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ def asJson(self):
jsonObject = {}
if self.about() is not None:
jsonObject["about"] = self.about()
if self.namespace() is not None:
jsonObject["namespace"] = self.namespace()
if self.comment() is not None:
jsonObject["comment"] = self.comment()
if self.dataType() is not None:
Expand Down Expand Up @@ -51,6 +53,12 @@ def about(self):
else:
return None

def namespace(self):
if "$rdf:about" in self.jsonDefinition:
return self.jsonDefinition["$rdf:about"][: -len(self.about())]
else:
return None

# Capitalized True/False is valid in python but not in json.
# Do not use this function in combination with json.load()
def is_used(self) -> bool:
Expand Down Expand Up @@ -211,6 +219,7 @@ def __init__(self, rdfsEntry):
self.super = rdfsEntry.subClassOf()
self.subclasses = []
self.stereotype = rdfsEntry.stereotype()
self.namespace = rdfsEntry.namespace()

def attributes(self):
return self.attribute_list
Expand Down Expand Up @@ -405,6 +414,13 @@ def _parse_rdf(input_dic, version): # NOSONAR
for instance in enum_instances:
clarse = _get_rid_of_hash(instance["type"])
if clarse and clarse in classes_map:
if instance.get("comment"):
instance["wrapped_comment"] = wrap_and_clean(
instance["comment"],
width=100,
initial_indent="# ",
subsequent_indent=(" # "),
)
classes_map[clarse].add_enum_instance(instance)
else:
logger.info("Class {} for enum instance {} not found.".format(clarse, instance))
Expand All @@ -429,6 +445,7 @@ def _write_python_files(elem_dict, lang_pack, output_path, version):
"class_location": lang_pack.get_class_location(class_name, elem_dict, version),
"class_name": class_name,
"class_origin": elem_dict[class_name].origins(),
"class_namespace": _get_namespace(elem_dict[class_name].namespace),
"enum_instances": elem_dict[class_name].enum_instances(),
"is_an_enum_class": elem_dict[class_name].is_an_enum_class(),
"is_a_primitive_class": elem_dict[class_name].is_a_primitive_class(),
Expand Down Expand Up @@ -467,6 +484,7 @@ def _write_python_files(elem_dict, lang_pack, output_path, version):
attribute["is_primitive_attribute"] = _get_bool_string(attribute_type == "primitive")
attribute["is_datatype_attribute"] = _get_bool_string(attribute_type == "datatype")
attribute["attribute_class"] = attribute_class
attribute["attribute_namespace"] = _get_namespace(attribute["namespace"])

class_details["attributes"].sort(key=lambda d: d["label"])
_write_files(class_details, output_path, version)
Expand Down Expand Up @@ -749,6 +767,14 @@ def _get_attribute_type(attribute: dict, class_infos: CIMComponentDefinition) ->
return attribute_type


def _get_namespace(parsed_namespace: str) -> str:
if parsed_namespace == "#":
namespace = cim_namespace
else:
namespace = parsed_namespace
return namespace


def _get_bool_string(bool_value: bool) -> str:
"""Convert boolean value into a string which is usable in both Python and Json.

Expand Down
9 changes: 0 additions & 9 deletions cimgen/languages/modernpython/lang_pack.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,14 +98,6 @@ def _get_python_type(datatype):
return "float"


def _set_imports(attributes):
import_set = set()
for attribute in attributes:
if attribute["is_datatype_attribute"] or attribute["is_primitive_attribute"]:
import_set.add(attribute["attribute_class"])
return sorted(import_set)


def _set_datatype_attributes(attributes) -> dict:
datatype_attributes = {}
datatype_attributes["python_type"] = "None"
Expand Down Expand Up @@ -136,7 +128,6 @@ def run_template(output_path, class_details):
template = class_template_file
class_details["setDefault"] = _set_default
class_details["setType"] = _set_type
class_details["imports"] = _set_imports(class_details["attributes"])
resource_file = _create_file(output_path, class_details, template)
_write_templated_file(resource_file, class_details, template["filename"])

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,6 @@ from pydantic.dataclasses import dataclass

from ..utils.profile import BaseProfile, Profile
from {{class_location}} import {{sub_class_of}}
{{#imports}}
from .{{.}} import {{.}}
{{/imports}}


@dataclass
Expand All @@ -35,16 +32,17 @@ class {{class_name}}({{sub_class_of}}):
{{/attr_origin}}
],
"is_used": {{#is_used}}True{{/is_used}}{{^is_used}}False{{/is_used}},
"namespace": "{{attribute_namespace}}", # NOSONAR
"is_class_attribute": {{#is_class_attribute}}True{{/is_class_attribute}}{{^is_class_attribute}}False{{/is_class_attribute}},
"is_datatype_attribute": {{#is_datatype_attribute}}True{{/is_datatype_attribute}}{{^is_datatype_attribute}}False{{/is_datatype_attribute}},
"is_enum_attribute": {{#is_enum_attribute}}True{{/is_enum_attribute}}{{^is_enum_attribute}}False{{/is_enum_attribute}},
"is_list_attribute": {{#is_list_attribute}}True{{/is_list_attribute}}{{^is_list_attribute}}False{{/is_list_attribute}},
"is_primitive_attribute": {{#is_primitive_attribute}}True{{/is_primitive_attribute}}{{^is_primitive_attribute}}False{{/is_primitive_attribute}},
{{#is_datatype_attribute}}
"attribute_class": {{attribute_class}},
"attribute_class": "{{attribute_class}}",
{{/is_datatype_attribute}}
{{#is_primitive_attribute}}
"attribute_class": {{attribute_class}},
"attribute_class": "{{attribute_class}}",
{{/is_primitive_attribute}}
},
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ from enum import Enum

class {{class_name}}(str, Enum):
"""
{{{class_comment}}} # noqa: E501
{{{wrapped_class_comment}}}
"""

{{#enum_instances}}
{{label}} = "{{label}}"{{#comment}} # {{comment}}{{/comment}} # noqa: E501
{{label}} = "{{label}}"{{#comment}}
{{wrapped_comment}}{{/comment}}
{{/enum_instances}}
225 changes: 203 additions & 22 deletions cimgen/languages/modernpython/utils/base.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
# Drop in dataclass replacement, allowing easier json dump and validation in the future.
import importlib
from lxml import etree
from dataclasses import Field, fields
from functools import cached_property
from typing import Any, TypeAlias, TypedDict
from typing import Any, TypeAlias, TypedDict, Optional

from pydantic.dataclasses import dataclass

Expand Down Expand Up @@ -133,38 +134,218 @@ def cgmes_attributes_in_profile(self, profile: BaseProfile | None) -> dict[str,
for f in fields(parent):
shortname = f.name
qualname = f"{parent.apparent_name()}.{shortname}" # type: ignore
infos = dict()

if f not in self.cgmes_attribute_names_in_profile(profile) or shortname in seen_attrs:
# Wrong profile or already found from a parent.
continue
else:
# Namespace finding
# "class namespace" means the first namespace defined in the inheritance tree.
# This can go up to Base, which will give the default cim NS.
if (extra := getattr(f.default, "json_schema_extra", None)) is None:
# The attribute does not have extra metadata. It might be a custom atttribute
# without it, or a base type (int...).
# Use the class namespace.
namespace = self.namespace
elif (attr_ns := extra.get("namespace", None)) is None:
# The attribute has some extras, but not namespace.
# Use the class namespace.
namespace = self.namespace
else:

# Namespace finding
# "class namespace" means the first namespace defined in the inheritance tree.
# This can go up to Base, which will give the default cim NS.
infos["namespace"] = self.namespace
extra = getattr(f.default, "json_schema_extra", None)
if extra and extra.get("is_used"):
# adding the extras, used for xml generation
extra_info = {
"attr_name": qualname,
"is_class_attribute": extra.get("is_class_attribute"),
"is_enum_attribute": extra.get("is_enum_attribute"),
"is_list_attribute": extra.get("is_list_attribute"),
"is_primitive_attribute": extra.get("is_primitive_attribute"),
"is_datatype_attribute": extra.get("is_datatype_attribute"),
"attribute_class": extra.get("attribute_class"),
}
if extra.get("namespace"):
# The attribute has an explicit namesapce
namespace = attr_ns
extra_info["namespace"] = extra.get("namespace", self.namespace)
infos.update(extra_info)

infos["value"] = getattr(self, shortname)

qual_attrs[qualname] = CgmesAttribute(
value=getattr(self, shortname),
namespace=namespace,
)
seen_attrs.add(shortname)
qual_attrs[qualname] = CgmesAttribute(infos)
seen_attrs.add(shortname)

return qual_attrs

def to_xml(self, profile_to_export: BaseProfile, id: Optional[str] = None) -> Optional[etree.Element]:
"""Creates an etree element of self with all non-empty attributes of the profile_to_export
that are not already defined in the recommanded profile
This can then be used to generate the xml file of the profile_to_export
Args:
profile_to_export (Profile): Profile for which we want to obtain the xml tree (eg. Profile.EQ)
id (Optional[str], optional): "_mRID", optional: some objects don't have mRID attribute. Defaults to None.
Returns:
Optional[etree.Element]: etree describing self for the profile_to_export, None if nothing to export
"""
profile_attributes = self._get_attributes_to_export(profile_to_export)

if "mRID" in self.to_dict():
obj_id = "_" + self.mRID
else:
obj_id = id

# if no attribute to export or no mRID, return None
if profile_attributes == {} or obj_id is None:
root = None
else:
# Create root element
nsmap = NAMESPACES
root = etree.Element("{" + self.namespace + "}" + self.resource_name, nsmap=nsmap)

# Add the ID as attribute to the root
if self.recommended_profile.value == profile_to_export.value:
root.set(f"""{{{nsmap["rdf"]}}}""" + "ID", obj_id)
else:
root.set(f"""{{{nsmap["rdf"]}}}""" + "about", "#" + obj_id)

root = self._add_attribute_to_etree(attributes=profile_attributes, root=root, nsmap=nsmap)
return root

def _get_attributes_to_export(self, profile_to_export: BaseProfile) -> dict:
attributes_to_export = self.cgmes_attributes_in_profile(profile_to_export)
is_recommanded_profile = self.recommended_profile.value == profile_to_export.value
if not is_recommanded_profile:
# remove attributes that are also present in "recommended_profile"
attributes_main = self.cgmes_attributes_in_profile(self.recommended_profile)
for key in attributes_main.keys():
if key in attributes_to_export:
del attributes_to_export[key]
attributes_to_export = self._remove_empty_attributes(attributes_to_export)
return attributes_to_export

@staticmethod
def _remove_empty_attributes(attributes: dict) -> dict:
for key, attribute in list(attributes.items()):
# Remove empty attributes
if attribute["value"] is None or attribute["value"] == "":
del attributes[key]
elif attribute.get("attribute_class") and attribute["attribute_class"] == "Boolean":
attribute["value"] = str(attribute["value"]).lower()
return attributes

@staticmethod
def _add_attribute_to_etree(attributes: dict, root: etree.Element, nsmap: dict) -> etree.Element:
rdf_namespace = f"""{{{nsmap["rdf"]}}}"""
for field_name, attribute in attributes.items():
# add all attributes relevant to the profile as SubElements
attr_namespace = attribute["namespace"]
element_name = f"{{{attr_namespace}}}{field_name}"

if attribute["is_class_attribute"]:
# class_attributes are exported as rdf: resource #_mRID_of_target
element = etree.SubElement(root, element_name)
element.set(rdf_namespace + "resource", "#" + attribute["value"])
elif attribute["is_enum_attribute"]:
element = etree.SubElement(root, element_name)
element.set(rdf_namespace + "resource", nsmap["cim"] + attribute["value"])
else:
element = etree.SubElement(root, element_name)
element.text = str(attribute["value"])
return root

def __str__(self) -> str:
"""Returns the string representation of this resource."""
return "\n".join([f"{k}={v}" for k, v in sorted(self.to_dict().items())])

def _parse_xml_fragment(self, xml_fragment: str) -> dict:
"""parses an xml fragment into a dict defining the class attributes

Args:
xml_fragment (str): xml string defining an instance of the current class

Returns:
attribute_dict (dict): a dictionnary of attributes to create/update the class instance
"""
attribute_dict = {}
xml_tree = etree.fromstring(xml_fragment)

# raise an error if the xml does not describe the expected class
if not xml_tree.tag.endswith(self.resource_name):
raise (KeyError(f"The fragment does not correspond to the class {self.resource_name}"))

attribute_dict.update(self._extract_mrid_from_etree(xml_tree=xml_tree))

# parsing attributes defined in class
class_attributes = self.cgmes_attributes_in_profile(None)
for key, class_attribute in class_attributes.items():
xml_attribute = xml_tree.findall(".//{*}" + key)
if len(xml_attribute) != 1:
continue
xml_attribute = xml_attribute[0]
attr = key.rsplit(".")[-1]

attr_value = self._extract_attr_value_from_etree(attr, class_attribute, xml_attribute)
if hasattr(self, attr) and attr_value is not None:
attribute_dict[attr] = attr_value

return attribute_dict

def _extract_mrid_from_etree(self, xml_tree: etree.Element) -> dict:
"""Parsing the mRID from etree attributes"""
mrid_dict = {}
for key, value in xml_tree.attrib.items():
if key.endswith("ID") or key.endswith("about"):
if value.startswith("#"):
value = value[1:]
if value.startswith("_"):
value = value[1:]
if hasattr(self, "mRID") and value is not None:
mrid_dict = {"mRID": value}
return mrid_dict

def _extract_attr_value_from_etree(self, attr_name: str, class_attribute: dict, xml_attribute: etree.Element):
"""Parsing the attribute value from etree attributes"""
attr_value = None
# class attributes are pointing to another class/instance defined in .attrib
if class_attribute["is_class_attribute"] and len(xml_attribute.keys()) == 1:
attr_value = xml_attribute.values()[0]
if attr_value.startswith("#"):
attr_value = attr_value[1:]

# enum attributes are defined in .attrib and has a prefix ending in "#"
elif class_attribute["is_enum_attribute"] and len(xml_attribute.keys()) == 1:
attr_value = xml_attribute.values()[0]
if "#" in attr_value:
attr_value = attr_value.split("#")[-1]

elif class_attribute["is_list_attribute"]:
attr_value = xml_attribute
attr_value = self.key.append(attr_value)
elif class_attribute["is_primitive_attribute"] or class_attribute["is_datatype_attribute"]:
attr_value = xml_attribute.text
if self.__dataclass_fields__[attr_name].type == bool:
attr_value = {"true": True, "false": False}.get(attr_value, None)
else:
# types are int, float or str (date, time and datetime treated as str)
attr_value = self.__dataclass_fields__[attr_name].type(attr_value)
else:
# other attributes types are defined in .text
attr_value = xml_attribute.text
return attr_value

def update_from_xml(self, xml_fragment: str):
"""
Updates the instance by parsing an xml fragment defining the attributes of this instance
example: updating the instance by parsing the corresponding fragment from the SSH profile
"""
attribute_dict = self._parse_xml_fragment(xml_fragment)

if attribute_dict["mRID"] == self.mRID:
for key, value in attribute_dict.items():
setattr(self, key, value)

@classmethod
def from_xml(cls, xml_fragment: str):
"""
Returns an instance of the class from an xml fragment defining the attributes written in the form:
<cim:IdentifiedObject.name>...</cim:IdentifiedObject.name>
example: creating an instance by parsing a fragment from the EQ profile
"""
attribute_dict = cls()._parse_xml_fragment(xml_fragment)

# Instantiate the class with the dictionary
return cls(**attribute_dict)

@staticmethod
def get_extra_prop(field: Field, prop: str) -> Any:
# The field is defined as a pydantic field, not a dataclass field,
Expand Down
Loading