Skip to content

Commit

Permalink
Merge pull request #257 from tangkong/perf_cli2
Browse files Browse the repository at this point in the history
PERF: refactor atef.bin cli entrypoint to defer imports (try 2)
  • Loading branch information
tangkong authored Sep 17, 2024
2 parents ef67fc4 + 225ede3 commit 87cba02
Show file tree
Hide file tree
Showing 11 changed files with 978 additions and 901 deletions.
475 changes: 15 additions & 460 deletions atef/bin/check.py

Large diffs are not rendered by default.

460 changes: 460 additions & 0 deletions atef/bin/check_main.py

Large diffs are not rendered by default.

32 changes: 4 additions & 28 deletions atef/bin/config.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,6 @@
"""
`atef config` opens up a graphical config file editor.
"""
import argparse
import logging
import sys
from typing import List, Optional

from pydm import exception
from qtpy.QtWidgets import QApplication, QStyleFactory

from ..type_hints import AnyPath
from ..widgets.config.window import Window

logger = logging.getLogger(__name__)
from qtpy.QtWidgets import QStyleFactory


def build_arg_parser(argparser=None):
Expand Down Expand Up @@ -77,18 +65,6 @@ def build_arg_parser(argparser=None):
return argparser


def main(cache_size: int, filenames: Optional[List[AnyPath]] = None, **kwargs):
app = QApplication(sys.argv)
main_window = Window(cache_size=cache_size, show_welcome=not filenames)
main_window.show()
exception.install()

for filename in filenames or []:
try:
main_window.open_file(filename=filename)
except FileNotFoundError:
logger.error(
"File specified on the command-line not found: %s", filename
)

app.exec()
def main(*args, **kwargs):
from atef.bin.config_main import main
main(*args, **kwargs)
31 changes: 31 additions & 0 deletions atef/bin/config_main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
"""
`atef config` opens up a graphical config file editor.
"""
import logging
import sys
from typing import List, Optional

from pydm import exception
from qtpy.QtWidgets import QApplication

from ..type_hints import AnyPath
from ..widgets.config.window import Window

logger = logging.getLogger(__name__)


def main(cache_size: int, filenames: Optional[List[AnyPath]] = None, **kwargs):
app = QApplication(sys.argv)
main_window = Window(cache_size=cache_size, show_welcome=not filenames)
main_window.show()
exception.install()

for filename in filenames or []:
try:
main_window.open_file(filename=filename)
except FileNotFoundError:
logger.error(
"File specified on the command-line not found: %s", filename
)

app.exec()
2 changes: 2 additions & 0 deletions atef/bin/scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ def gather_scripts() -> Dict[str, Tuple[Callable, Callable]]:
scripts_module = importlib.import_module("atef.scripts")
for sub_module in iter_modules(scripts_module.__path__):
module_name = sub_module.name
if "_main" in module_name:
continue
try:
module = importlib.import_module(f".{module_name}", "atef.scripts")
except Exception as ex:
Expand Down
280 changes: 4 additions & 276 deletions atef/scripts/converter_v0.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,272 +2,12 @@
This script will convert a prototype atef configuration file to the latest
supported (and numbered) version.
"""

from __future__ import annotations

import argparse
import json
import logging
import pathlib
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Tuple, Union, cast

import apischema
import yaml

import atef
import atef.config
from atef import serialization, tools
from atef.check import Comparison
from atef.type_hints import AnyPath

logger = logging.getLogger(__name__)
DESCRIPTION = __doc__


@dataclass
class IdentifierAndComparison:
"""
Set of identifiers (IDs) and comparisons to perform on those identifiers.
"""

#: An optional identifier for this set.
name: Optional[str] = None
#: PV name, attribute name, or test-specific identifier.
ids: List[str] = field(default_factory=list)
#: The comparisons to perform for *each* of the ids.
comparisons: List[Comparison] = field(default_factory=list)


@dataclass
@serialization.as_tagged_union
class Configuration:
"""
Configuration base class for shared settings between all configurations.
Subclasses of Comparison will be serialized as a tagged union. This means
that the subclass name will be used as an identifier for the generated
serialized dictionary (and JSON object).
"""

#: Name tied to this configuration.
name: Optional[str] = None
#: Description tied to this configuration.
description: Optional[str] = None
#: Tags tied to this configuration.
tags: Optional[List[str]] = None
#: Comparison checklist for this configuration.
checklist: List[IdentifierAndComparison] = field(default_factory=list)


@dataclass
class DeviceConfiguration(Configuration):
"""
A configuration that is built to check one or more devices.
Identifiers are by default assumed to be attribute (component) names of the
devices. Identifiers may refer to components on the device
(``"component"`` would mean to access each device's ``.component``) or may
refer to any level of sub-device components (``"sub_device.component"``
would mean to access each device's ``.sub_device`` and that sub-device's
``.a`` component).
"""

#: Happi device names which give meaning to self.checklist[].ids.
devices: List[str] = field(default_factory=list)


@dataclass
class PVConfiguration(Configuration):
"""
A configuration that is built to check live EPICS PVs.
Identifiers are by default assumed to be PV names.
"""

...


@dataclass
class ToolConfiguration(Configuration):
"""
A configuration unrelated to PVs or Devices which verifies status via some
tool.
Comparisons can optionally be run on the tool's results.
"""

tool: tools.Tool = field(default_factory=tools.Ping)


AnyConfiguration = Union[
PVConfiguration,
DeviceConfiguration,
ToolConfiguration,
]
PathItem = Union[
AnyConfiguration,
IdentifierAndComparison,
Comparison,
str,
]


@dataclass
class PrototypeConfigurationFile:
#: configs: PVConfiguration, DeviceConfiguration, or ToolConfiguration.
configs: List[Configuration]

@classmethod
def from_file(cls, filename: AnyPath) -> PrototypeConfigurationFile:
"""Load a configuration file from JSON or yaml."""
filename = pathlib.Path(filename)
if filename.suffix.lower() in (".yml", ".yaml"):
return cls.from_yaml(filename)
return cls.from_json(filename)

@classmethod
def from_json(cls, filename: AnyPath) -> PrototypeConfigurationFile:
"""Load a configuration file from JSON."""
with open(filename) as fp:
serialized_config = json.load(fp)
return apischema.deserialize(cls, serialized_config)

@classmethod
def from_yaml(cls, filename: AnyPath) -> PrototypeConfigurationFile:
"""Load a configuration file from yaml."""
with open(filename) as fp:
serialized_config = yaml.safe_load(fp)
return apischema.deserialize(cls, serialized_config)


def _split_shared_checklist(
checklist: List[IdentifierAndComparison],
) -> Tuple[List[Comparison], Dict[str, List[Comparison]]]:
"""
Split a prototype "checklist", consisting of pairs of identifiers and
comparisons into the new format of "shared" and "per-identifier" (i.e.,
pv/attr) comparisons.
Parameters
----------
checklist : List[IdentifierAndComparison]
The prototype checklist.
Returns
-------
List[Comparison]
Shared comparisons.
Dict[str, List[Comparison]]
Per-identifier comparisons, with the identifier as the key.
"""
shared = []
by_identifier = {}
if len(checklist) == 1:
# If there is only one checklist, the comparisons can be considered
# "shared".
for check in checklist:
for comparison in check.comparisons:
shared.append(comparison)
for identifier in check.ids:
by_identifier.setdefault(identifier, [])
else:
# Otherwise, comparisons from every checklist will become
# per-identifier.
for check in checklist:
for comparison in check.comparisons:
for identifier in check.ids:
by_identifier.setdefault(identifier, []).append(comparison)
return shared, by_identifier


def convert_configuration(config: AnyConfiguration) -> atef.config.AnyConfiguration:
"""
Convert a prototype Configuration to a supported one.
Parameters
----------
config : AnyConfiguration
The old prototype configuration.
Returns
-------
atef.config.AnyConfiguration
The new and supported configuration.
"""
if not isinstance(config, (DeviceConfiguration, PVConfiguration, ToolConfiguration)):
raise ValueError(f"Unexpected and unsupported config type: {type(config)}")

shared, by_identifier = _split_shared_checklist(config.checklist)
if isinstance(config, DeviceConfiguration):
return atef.config.DeviceConfiguration(
name=config.name,
description=config.description,
tags=config.tags,
devices=config.devices,
by_attr=by_identifier,
shared=shared,
)

if isinstance(config, PVConfiguration):
return atef.config.PVConfiguration(
name=config.name,
description=config.description,
tags=config.tags,
by_pv=by_identifier,
shared=shared,
)

if isinstance(config, ToolConfiguration):
return atef.config.ToolConfiguration(
name=config.name,
description=config.description,
tags=config.tags,
tool=config.tool,
shared=shared,
by_attr=by_identifier,
)


def load(filename: AnyPath) -> atef.config.ConfigurationFile:
"""
Load the provided prototype atef configuration file to the latest
supported (and numbered) version.
Parameters
----------
filename : AnyPath
The filename to open.
Returns
-------
atef.config.ConfigurationFile
The converted configuration file.
"""
old = PrototypeConfigurationFile.from_file(filename)
new = atef.config.ConfigurationFile()
for config in old.configs:
config = cast(AnyConfiguration, config)
new.root.configs.append(convert_configuration(config))
return new


def convert(fn: AnyPath) -> str:
"""
Convert the provided prototype atef configuration file, returning JSON
to be saved.
Parameters
----------
filename : AnyPath
The filename to open.
Returns
-------
str
The new file contents.
"""
return json.dumps(load(fn).to_json(), indent=2)
DESCRIPTION = __doc__


def build_arg_parser(argparser=None) -> argparse.ArgumentParser:
Expand Down Expand Up @@ -303,21 +43,9 @@ def build_arg_parser(argparser=None) -> argparse.ArgumentParser:
return argparser


def main(
filename: str,
write: bool
):
for filename in filename:
converted = convert(filename)

if write:
logger.warning("Overwriting converted file: %s", filename)
with open(filename, "wt") as fp:
print(converted, file=fp)
else:
print(f"-- {filename} --")
print(converted)
print()
def main(*args, **kwargs):
from atef.scripts.converter_v0_main import main
main(*args, **kwargs)


def main_script(args=None) -> None:
Expand Down
Loading

0 comments on commit 87cba02

Please sign in to comment.