Skip to content

Commit

Permalink
Initial implementation of an RDF file parser (#27)
Browse files Browse the repository at this point in the history
  • Loading branch information
avaucher authored Sep 12, 2023
1 parent c193c27 commit ccbd55e
Show file tree
Hide file tree
Showing 18 changed files with 1,533 additions and 0 deletions.
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ console_scripts =
rxn-combine-reaction = rxn.chemutils.scripts.combine_reaction:main
rxn-detokenize = rxn.chemutils.scripts.detokenize:main
rxn-tokenize = rxn.chemutils.scripts.tokenize:main
rxn-rdf-to-smiles = rxn.chemutils.scripts.rdf_to_smiles:main

[flake8]
extend-ignore = E203, E501
Expand Down
29 changes: 29 additions & 0 deletions src/rxn/chemutils/conversion.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,35 @@ def sanitize_mol(
raise SanitizationError(mol) from e


def mdl_to_smiles(
mdl: str,
sanitize: bool = True,
canonicalize: bool = True,
default: Optional[str] = None,
) -> str:
"""
Convert MDL molecule description to SMILES.
Args:
mdl: molblock to convert
sanitize: whether to sanitize. May be set to False to avoid RDKit valence check.
canonicalize: whether to canonicalize for conversion to SMILES.
default: if specified, what will be returned for conversion errors.
Returns:
SMILES string.
"""

try:
mol = mdl_to_mol(mdl, sanitize=sanitize)
except InvalidMdl:
if default is not None:
return default
else:
raise
return mol_to_smiles(mol, canonical=canonicalize)


def remove_hydrogens(mol: Mol) -> Mol:
"""
Remove unnecessary hydrogens in a molecule.
Expand Down
10 changes: 10 additions & 0 deletions src/rxn/chemutils/rdf/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from .rdf_parser import RdfParser, iterate_reactions_from_file
from .rdf_reaction import RdfReaction
from .reaction_smiles_extractor import ReactionSmilesExtractor

__all__ = [
"RdfParser",
"RdfReaction",
"ReactionSmilesExtractor",
"iterate_reactions_from_file",
]
117 changes: 117 additions & 0 deletions src/rxn/chemutils/rdf/property_parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
import re
from typing import Any, Dict, Generator, List, Tuple

from rxn.utilities.regex import capturing

_LIST_REGEX_STRING = capturing(".+") + r"\(" + capturing(r"\d+") + r"\)"
_LIST_REGEX = re.compile(_LIST_REGEX_STRING)


class PropertyParser:
"""
To parse properties given in a RDF / MDL into a nested dictionary.
"""

def __init__(self) -> None:
self.result: Dict[str, Any] = {}

def parse_dict(self, property_dict: Dict[str, str]) -> None:
for key, value in property_dict.items():
self.parse_property(key, value)

def parse_property(self, key: str, value: str) -> None:
self._parse_property(self.result, key, value)

def _is_list_property(self, subkey: str) -> bool:
return _LIST_REGEX.match(subkey) is not None

def _parse_property(self, container: Dict[str, Any], key: str, value: str) -> None:
if not key:
raise ValueError("A key must be provided.")

splits = key.split(":")
if len(splits) == 1:
container[key] = value
return

if self._is_list_property(splits[0]):
self._parse_list_property(container, splits, value)
else:
self._parse_dict_property(container, splits, value)

def _parse_list_property(
self, container: Dict[str, Any], key_splits: List[str], value: str
) -> None:
list_match = _LIST_REGEX.match(key_splits[0])
if list_match is None:
raise RuntimeError(
"Not a list property - by construction, this should not happen."
)

subkey = list_match.group(1)
list_index = int(list_match.group(2))

# Initialize list if needed
if subkey not in container:
container[subkey] = []

subkey_list = container[subkey]

# Add new dicts if necessary
for _ in range(len(subkey_list), list_index):
subkey_list.append({})

self._parse_property(
subkey_list[list_index - 1], ":".join(key_splits[1:]), value
)

def _parse_dict_property(
self, container: Dict[str, Any], key_splits: List[str], value: str
) -> None:
subkey = key_splits[0]
if subkey not in container:
container[subkey] = {}
self._parse_property(container[subkey], ":".join(key_splits[1:]), value)


class PropertySerializer:
"""Do the reverse operation compared to PropertyParser."""

def convert_dict(self, container: Dict[str, Any]) -> Dict[str, str]:
return {
key: value
for key, value in self._convert_dict(prefix="", container=container)
}

def _convert_dict(
self, prefix: str, container: Dict[str, Any]
) -> Generator[Tuple[str, str], None, None]:
for key, value in container.items():
yield from self._convert(prefix=prefix, key=key, current=value)

def _convert(
self, prefix: str, key: str, current: Any
) -> Generator[Tuple[str, str], None, None]:
if isinstance(current, str):
yield f"{prefix}{key}", current
elif isinstance(current, list):
for index, v in enumerate(current, 1):
yield from self._convert_dict(
prefix=f"{prefix}{key}({index}):", container=v
)
elif isinstance(current, dict):
yield from self._convert_dict(prefix=f"{prefix}{key}:", container=current)
else:
raise RuntimeError(f"Not supported for property serialization: {current}")


def parse_properties(properties: Dict[str, str]) -> Dict[str, Any]:
"""Parse the properties given in the RDF into a nested dictionary."""
pp = PropertyParser()
pp.parse_dict(properties)
return pp.result


def serialize_properties(properties: Dict[str, Any]) -> Dict[str, str]:
"""Do the reverse operation compared to parse_properties."""
return PropertySerializer().convert_dict(properties)
201 changes: 201 additions & 0 deletions src/rxn/chemutils/rdf/rdf_parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
import re
from pathlib import Path
from typing import Callable, Iterator, List, Optional, Union

from rxn.utilities.regex import capturing

from .rdf_reaction import RdfReaction


class RdfParsingError(RuntimeError):
"""Exception for RDF parsing errors."""


class InvalidBlock(RdfParsingError):
"""Exception raised when a block of an RDF file cannot be processed."""

def __init__(self, lines: List[str]):
self.lines = lines
super().__init__("Invalid block:\n" + "\n".join(lines))


class IncompleteReaction(RdfParsingError):
"""Exception for incomplete reaction in RDF file."""


BLOCK_TYPE_REGEX = re.compile(r"\$" + capturing(r"\w+"))
RIREG_REGEX = re.compile(r"\$RFMT \$[RM]IREG " + capturing(r"\d+"))
DTYPE_REGEX = re.compile(r"\$DTYPE " + capturing(".*"))
DATUM_REGEX = re.compile(r"\$DATUM " + capturing(".*"))


class ParsedReaction:
"""
Reaction under construction during parsing of an RDF file.
"""

def __init__(self) -> None:
self.rireg: Optional[int] = None
self.n_precursors: Optional[int] = None
self.n_products: Optional[int] = None
self.mols: List[str] = []
self.dtypes: List[str] = []
self.datums: List[str] = []

def handle_line_block(self, lines: List[str]) -> None:
block_type = self.block_type(lines)
if block_type == "RFMT":
self.handle_rfmt(lines)
elif block_type == "RXN":
self.handle_rxn(lines)
elif block_type == "MOL":
self.handle_mol(lines)
elif block_type == "DTYPE":
self.handle_dtype(lines)
elif block_type == "DATUM":
self.handle_datum(lines)
else:
raise ValueError(f"Invalid block type: {block_type}")

def handle_rfmt(self, lines: List[str]) -> None:
if len(lines) != 1:
raise InvalidBlock(lines)
match = RIREG_REGEX.match(lines[0])
if match is None:
raise InvalidBlock(lines)
self.rireg = int(match.group(1))

def handle_rxn(self, lines: List[str]) -> None:
if len(lines) != 5:
raise InvalidBlock(lines)

self.n_precursors, self.n_products = [int(x) for x in lines[4].split()]

def handle_mol(self, lines: List[str]) -> None:
self.mols.append("\n".join(lines[1:]))

def handle_dtype(self, lines: List[str]) -> None:
if len(lines) != 1:
raise InvalidBlock(lines)

match = DTYPE_REGEX.match(lines[0])
if match is None:
raise InvalidBlock(lines)
self.dtypes.append(match.group(1))

def handle_datum(self, lines: List[str]) -> None:
match = DATUM_REGEX.match(lines[0])
if match is None:
raise InvalidBlock(lines)
lines[0] = match.group(1)

# special case: leave out $MFMT, otherwise the property will not be
# valid MolBlocks for parsing with RDKit.
if lines[0] == "$MFMT":
lines = lines[1:]

self.datums.append("\n".join(lines))

def block_type(self, lines: List[str]) -> str:
"""Get the type of a block: RXN, DTYPE, DATUM, etc."""
match = BLOCK_TYPE_REGEX.match(lines[0])
if match is None:
raise InvalidBlock(lines)
return match.group(1)

def to_reaction(self) -> RdfReaction:
if self.rireg is None or self.n_precursors is None or self.n_products is None:
raise IncompleteReaction()

if len(self.mols) != self.n_precursors + self.n_products:
raise RdfParsingError()

precursors = self.mols[: self.n_precursors]
products = self.mols[-self.n_products :]

if len(self.dtypes) != len(self.datums):
raise RdfParsingError()

meta = {key: value for key, value in zip(self.dtypes, self.datums)}

return RdfReaction(
reactants=precursors,
reagents=[],
products=products,
meta=meta,
reaction_index=self.rireg,
)


class RdfParser:
"""
Custom parser for RDF files.
"""

def __init__(self, filename: Union[Path, str], encoding: str = "latin-1"):
"""
Args:
filename: path to the RDF file to read.
encoding: file encoding. Defaults to latin-1 because Thieme has such
an encoding for several files.
"""
self.filename = filename
self.encoding = encoding

def __iter__(self) -> Iterator[RdfReaction]:
yield from self.iter_reactions()

def iter_reactions(self) -> Iterator[RdfReaction]:
block_iterator = self.iter_blocks()
# Consume line with RDFILE
_ = next(block_iterator)
# Consume line with DATM
_ = next(block_iterator)

current_reaction = None
for lines in block_iterator:
if lines[0].startswith("$RFMT"):
# A new reaction started.
# We yield the current one before initializing the new one
if current_reaction is not None:
yield current_reaction.to_reaction()
current_reaction = ParsedReaction()

if current_reaction is None:
raise RuntimeError("No reaction block started")

current_reaction.handle_line_block(lines)

# yield the last reaction
if current_reaction is not None:
yield current_reaction.to_reaction()

def iter_blocks(self) -> Iterator[List[str]]:
current_line_block: List[str] = []
with open(self.filename, "rt", encoding=self.encoding) as f:
for line in f:
line = line.rstrip("\n")

if line.startswith("$"):
if current_line_block:
yield current_line_block
current_line_block = []

current_line_block.append(line)

# Last line block at the end of the file
if current_line_block:
yield current_line_block


def iterate_reactions_from_file(
filename: Union[Path, str],
filter_fn: Optional[Callable[[RdfReaction], bool]] = None,
) -> Iterator[RdfReaction]:
parser = RdfParser(filename)
reactions = (entry for entry in parser.iter_reactions())

if filter_fn is not None:
reactions = (reaction for reaction in reactions if filter_fn(reaction))

yield from reactions
Loading

0 comments on commit ccbd55e

Please sign in to comment.