Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PERF: refactor atef.bin cli entrypoint to defer imports (try 2) #257

Merged
merged 2 commits into from
Sep 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
475 changes: 15 additions & 460 deletions atef/bin/check.py

Large diffs are not rendered by default.

460 changes: 460 additions & 0 deletions atef/bin/check_main.py

Large diffs are not rendered by default.

32 changes: 4 additions & 28 deletions atef/bin/config.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,6 @@
"""
`atef config` opens up a graphical config file editor.
"""
import argparse
import logging
import sys
from typing import List, Optional

from pydm import exception
from qtpy.QtWidgets import QApplication, QStyleFactory

from ..type_hints import AnyPath
from ..widgets.config.window import Window

logger = logging.getLogger(__name__)
from qtpy.QtWidgets import QStyleFactory


def build_arg_parser(argparser=None):
Expand Down Expand Up @@ -77,18 +65,6 @@ def build_arg_parser(argparser=None):
return argparser


def main(cache_size: int, filenames: Optional[List[AnyPath]] = None, **kwargs):
app = QApplication(sys.argv)
main_window = Window(cache_size=cache_size, show_welcome=not filenames)
main_window.show()
exception.install()

for filename in filenames or []:
try:
main_window.open_file(filename=filename)
except FileNotFoundError:
logger.error(
"File specified on the command-line not found: %s", filename
)

app.exec()
def main(*args, **kwargs):
from atef.bin.config_main import main
main(*args, **kwargs)
31 changes: 31 additions & 0 deletions atef/bin/config_main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
"""
`atef config` opens up a graphical config file editor.
"""
import logging
import sys
from typing import List, Optional

from pydm import exception
from qtpy.QtWidgets import QApplication

from ..type_hints import AnyPath
from ..widgets.config.window import Window

logger = logging.getLogger(__name__)


def main(cache_size: int, filenames: Optional[List[AnyPath]] = None, **kwargs):
app = QApplication(sys.argv)
main_window = Window(cache_size=cache_size, show_welcome=not filenames)
main_window.show()
exception.install()

for filename in filenames or []:
try:
main_window.open_file(filename=filename)
except FileNotFoundError:
logger.error(
"File specified on the command-line not found: %s", filename
)

app.exec()
2 changes: 2 additions & 0 deletions atef/bin/scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ def gather_scripts() -> Dict[str, Tuple[Callable, Callable]]:
scripts_module = importlib.import_module("atef.scripts")
for sub_module in iter_modules(scripts_module.__path__):
module_name = sub_module.name
if "_main" in module_name:
continue
try:
module = importlib.import_module(f".{module_name}", "atef.scripts")
except Exception as ex:
Expand Down
280 changes: 4 additions & 276 deletions atef/scripts/converter_v0.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,272 +2,12 @@
This script will convert a prototype atef configuration file to the latest
supported (and numbered) version.
"""

from __future__ import annotations

import argparse
import json
import logging
import pathlib
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Tuple, Union, cast

import apischema
import yaml

import atef
import atef.config
from atef import serialization, tools
from atef.check import Comparison
from atef.type_hints import AnyPath

logger = logging.getLogger(__name__)
DESCRIPTION = __doc__


@dataclass
class IdentifierAndComparison:
"""
Set of identifiers (IDs) and comparisons to perform on those identifiers.
"""

#: An optional identifier for this set.
name: Optional[str] = None
#: PV name, attribute name, or test-specific identifier.
ids: List[str] = field(default_factory=list)
#: The comparisons to perform for *each* of the ids.
comparisons: List[Comparison] = field(default_factory=list)


@dataclass
@serialization.as_tagged_union
class Configuration:
"""
Configuration base class for shared settings between all configurations.

Subclasses of Comparison will be serialized as a tagged union. This means
that the subclass name will be used as an identifier for the generated
serialized dictionary (and JSON object).
"""

#: Name tied to this configuration.
name: Optional[str] = None
#: Description tied to this configuration.
description: Optional[str] = None
#: Tags tied to this configuration.
tags: Optional[List[str]] = None
#: Comparison checklist for this configuration.
checklist: List[IdentifierAndComparison] = field(default_factory=list)


@dataclass
class DeviceConfiguration(Configuration):
"""
A configuration that is built to check one or more devices.

Identifiers are by default assumed to be attribute (component) names of the
devices. Identifiers may refer to components on the device
(``"component"`` would mean to access each device's ``.component``) or may
refer to any level of sub-device components (``"sub_device.component"``
would mean to access each device's ``.sub_device`` and that sub-device's
``.a`` component).
"""

#: Happi device names which give meaning to self.checklist[].ids.
devices: List[str] = field(default_factory=list)


@dataclass
class PVConfiguration(Configuration):
"""
A configuration that is built to check live EPICS PVs.

Identifiers are by default assumed to be PV names.
"""

...


@dataclass
class ToolConfiguration(Configuration):
"""
A configuration unrelated to PVs or Devices which verifies status via some
tool.

Comparisons can optionally be run on the tool's results.
"""

tool: tools.Tool = field(default_factory=tools.Ping)


AnyConfiguration = Union[
PVConfiguration,
DeviceConfiguration,
ToolConfiguration,
]
PathItem = Union[
AnyConfiguration,
IdentifierAndComparison,
Comparison,
str,
]


@dataclass
class PrototypeConfigurationFile:
#: configs: PVConfiguration, DeviceConfiguration, or ToolConfiguration.
configs: List[Configuration]

@classmethod
def from_file(cls, filename: AnyPath) -> PrototypeConfigurationFile:
"""Load a configuration file from JSON or yaml."""
filename = pathlib.Path(filename)
if filename.suffix.lower() in (".yml", ".yaml"):
return cls.from_yaml(filename)
return cls.from_json(filename)

@classmethod
def from_json(cls, filename: AnyPath) -> PrototypeConfigurationFile:
"""Load a configuration file from JSON."""
with open(filename) as fp:
serialized_config = json.load(fp)
return apischema.deserialize(cls, serialized_config)

@classmethod
def from_yaml(cls, filename: AnyPath) -> PrototypeConfigurationFile:
"""Load a configuration file from yaml."""
with open(filename) as fp:
serialized_config = yaml.safe_load(fp)
return apischema.deserialize(cls, serialized_config)


def _split_shared_checklist(
checklist: List[IdentifierAndComparison],
) -> Tuple[List[Comparison], Dict[str, List[Comparison]]]:
"""
Split a prototype "checklist", consisting of pairs of identifiers and
comparisons into the new format of "shared" and "per-identifier" (i.e.,
pv/attr) comparisons.

Parameters
----------
checklist : List[IdentifierAndComparison]
The prototype checklist.

Returns
-------
List[Comparison]
Shared comparisons.
Dict[str, List[Comparison]]
Per-identifier comparisons, with the identifier as the key.
"""
shared = []
by_identifier = {}
if len(checklist) == 1:
# If there is only one checklist, the comparisons can be considered
# "shared".
for check in checklist:
for comparison in check.comparisons:
shared.append(comparison)
for identifier in check.ids:
by_identifier.setdefault(identifier, [])
else:
# Otherwise, comparisons from every checklist will become
# per-identifier.
for check in checklist:
for comparison in check.comparisons:
for identifier in check.ids:
by_identifier.setdefault(identifier, []).append(comparison)
return shared, by_identifier


def convert_configuration(config: AnyConfiguration) -> atef.config.AnyConfiguration:
"""
Convert a prototype Configuration to a supported one.

Parameters
----------
config : AnyConfiguration
The old prototype configuration.

Returns
-------
atef.config.AnyConfiguration
The new and supported configuration.
"""
if not isinstance(config, (DeviceConfiguration, PVConfiguration, ToolConfiguration)):
raise ValueError(f"Unexpected and unsupported config type: {type(config)}")

shared, by_identifier = _split_shared_checklist(config.checklist)
if isinstance(config, DeviceConfiguration):
return atef.config.DeviceConfiguration(
name=config.name,
description=config.description,
tags=config.tags,
devices=config.devices,
by_attr=by_identifier,
shared=shared,
)

if isinstance(config, PVConfiguration):
return atef.config.PVConfiguration(
name=config.name,
description=config.description,
tags=config.tags,
by_pv=by_identifier,
shared=shared,
)

if isinstance(config, ToolConfiguration):
return atef.config.ToolConfiguration(
name=config.name,
description=config.description,
tags=config.tags,
tool=config.tool,
shared=shared,
by_attr=by_identifier,
)


def load(filename: AnyPath) -> atef.config.ConfigurationFile:
"""
Load the provided prototype atef configuration file to the latest
supported (and numbered) version.

Parameters
----------
filename : AnyPath
The filename to open.

Returns
-------
atef.config.ConfigurationFile
The converted configuration file.
"""
old = PrototypeConfigurationFile.from_file(filename)
new = atef.config.ConfigurationFile()
for config in old.configs:
config = cast(AnyConfiguration, config)
new.root.configs.append(convert_configuration(config))
return new


def convert(fn: AnyPath) -> str:
"""
Convert the provided prototype atef configuration file, returning JSON
to be saved.

Parameters
----------
filename : AnyPath
The filename to open.

Returns
-------
str
The new file contents.
"""
return json.dumps(load(fn).to_json(), indent=2)
DESCRIPTION = __doc__


def build_arg_parser(argparser=None) -> argparse.ArgumentParser:
Expand Down Expand Up @@ -303,21 +43,9 @@ def build_arg_parser(argparser=None) -> argparse.ArgumentParser:
return argparser


def main(
filename: str,
write: bool
):
for filename in filename:
converted = convert(filename)

if write:
logger.warning("Overwriting converted file: %s", filename)
with open(filename, "wt") as fp:
print(converted, file=fp)
else:
print(f"-- {filename} --")
print(converted)
print()
def main(*args, **kwargs):
from atef.scripts.converter_v0_main import main
main(*args, **kwargs)


def main_script(args=None) -> None:
Expand Down
Loading