Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
ottowayi committed Dec 7, 2021
2 parents d551f3d + ff6ca1e commit f847dd6
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 98 deletions.
67 changes: 23 additions & 44 deletions pycomm3/custom_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

import ipaddress
from io import BytesIO
from typing import Any, Type, Dict, Tuple, Union
from typing import Any, Type, Dict, Tuple, Union, Set

from .cip import (
DataType,
Expand Down Expand Up @@ -157,9 +157,7 @@ def _decode(cls, stream: BytesIO):

StructTemplateAttributes = Struct(
UINT("count"),
Struct(UINT("attr_num"), UINT("status"), UDINT("size"))(
name="object_definition_size"
),
Struct(UINT("attr_num"), UINT("status"), UDINT("size"))(name="object_definition_size"),
Struct(UINT("attr_num"), UINT("status"), UDINT("size"))(name="structure_size"),
Struct(UINT("attr_num"), UINT("status"), UINT("count"))(name="member_count"),
Struct(UINT("attr_num"), UINT("status"), UINT("handle"))(name="structure_handle"),
Expand All @@ -169,33 +167,30 @@ def _decode(cls, stream: BytesIO):
class _StructTagReprMeta(_StructReprMeta):
def __repr__(cls):
members = ", ".join(repr(m) for m in cls.members)
return f"{cls.__name__}({members}, bool_members={cls.bits!r}, host_members={cls.hosts!r}, struct_size={cls.size!r})"
return f"{cls.__name__}({members}, bool_members={cls.bits!r}, struct_size={cls.size!r})" # TODO


def StructTag(
*members,
bool_members: Dict[str, Tuple[str, int]],
host_members: Dict[str, Type[DataType]],
# (datatype, offset) of each member of the struct, does not include bit members aliased to other members
*members: Tuple[DataType, int],
bit_members: Dict[str, Tuple[int, int]], # {member name, (offset, bit #) }
private_members: Set[str], # private members that should not be in the final value
struct_size: int,
) -> Type[StructType]:
"""
bool_members = {member name: (host member, bit)}
"""

_members = [x[0] for x in members]
_offsets_ = {member: offset for (member, offset) in members}
_struct = Struct(*_members)

class StructTag(_struct, metaclass=_StructTagReprMeta):
bits = bool_members
hosts = host_members
bits = bit_members
private = private_members
size = struct_size
_offsets = _offsets_

@classmethod
def _decode(cls, stream: BytesIO):
stream = BytesIO(stream.read(cls.size))
raw = stream.getvalue()
values = {}

for member in cls.members:
Expand All @@ -204,49 +199,33 @@ def _decode(cls, stream: BytesIO):
stream.read(offset - stream.tell())
values[member.name] = member.decode(stream)

hosts = set()

for bit_member, (host_member, bit) in cls.bits.items():
host_value = values[host_member]
if cls.hosts[host_member] == DWORD:
bit_value = host_value[bit]
else:
bit_value = bool(host_value & (1 << bit))

for bit_member, (offset, bit) in cls.bits.items():
bit_value = bool(raw[offset] & (1 << bit))
values[bit_member] = bit_value
hosts.add(host_member)

return {k: v for k, v in values.items() if k not in hosts}
return {k: v for k, v in values.items() if k not in cls.private}

@classmethod
def _encode(cls, values: Dict[str, Any]):
# make a copy so that private host members aren't added to the original
values = {k: v for k, v in values.items()}

for host, host_type in cls.hosts.items():
if host_type == DWORD:
values[host] = [
False,
] * 32
else:
values[host] = 0

for bit_member, (host_member, bit) in cls.bits.items():
val = values[bit_member]
if cls.hosts[host_member] == DWORD:
values[host_member][bit] = bool(val)
else:
if val:
values[host_member] |= 1 << bit
else:
values[host_member] &= ~(1 << bit)

value = bytearray(cls.size)
for member in cls.members:
if member.name in cls.private:
continue
offset = cls._offsets[member]
encoded = member.encode(values[member.name])
value[offset : offset + len(encoded)] = encoded

for bit_member, (offset, bit) in cls.bits.items():
val = values[bit_member]

if val:
value[offset] |= 1 << bit
else:
value[offset] &= ~(1 << bit)

return value

return StructTag
72 changes: 18 additions & 54 deletions pycomm3/logix_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -766,7 +766,7 @@ def _parse_template_data(self, data, template, symbol_type):
template_name = None
try:
for name in (
x.decode(errors="replace") for x in data[info_len:].split(b"\x00") if len(x)
x.decode(errors="replace") for x in data[info_len:].split(b"\x00")
):
if template_name is None and ";" in name:
template_name, _ = name.split(";", maxsplit=1)
Expand Down Expand Up @@ -795,66 +795,30 @@ def _parse_template_data(self, data, template, symbol_type):

_struct_members = []
_bit_members = {}
_host_members = {} # {offset: (member name, type)}
_multibyte_hosts = {} # hosts for bits that are larger than sints
_private_members = set()
_unk_member_count = 0
for member, info in zip(member_names, member_data):
if (member.startswith("ZZZZZZZZZZ") or member.startswith("__")) or (
predefine and member in {"CTL", "Control"}
if not member: # handle unnamed private members
member = f'__unknown{_unk_member_count}' # double-underscore makes it 'private'
_unk_member_count += 1
if (
member.startswith("ZZZZZZZZZZ") or
member.startswith("__") or
(predefine and member in {"CTL", "Control"})
):
_host_members[info["offset"]] = (member, info["type_class"])
if info["type_class"].size > USINT.size:
# for host members larger than sints, store what the individual bytes offsets
# would be and use them to update the bit numbers later on for the bool methods
# that way we don't have to mess with changing the offset sizes
# {offset: (member_name, offset in host member, offset in packet)}
_multibyte_hosts.update(
{
info["offset"] + i: (member, i, info["offset"])
for i in range(info["type_class"].size)
}
)

_private_members.add(member)
else:
data_type["attributes"].append(member)

data_type["internal_tags"][member] = info

if info["data_type_name"] == "BOOL":
if predefine:
if 0 in _host_members and _host_members[0][0] in {"CTL", "Control"}:
# for most predefined types, we assume all 'offsets' refer to the hidden CTL attribute
if info["offset"] in _multibyte_hosts:
_, host_offset, packet_offset = _multibyte_hosts[info["offset"]]
# adjust the bit number by the byte offset within the host member
# but use the packet offset aka host member's offset to map the bit member to the host
_bit_members[member] = (
_host_members[packet_offset][0],
info["bit"] + 8 * host_offset,
)
else:
_bit_members[member] = (_host_members[0][0], info["bit"])
else:
# some predefined types don't list their host members, at least ANALOG_ALARM doesn't
# but if trying to access the whole struct for ANALOG_ALARM leads to a 'permission denied'
# error, so this isn't used anywhere yet and creates a broken StructTag type
# but leaving it in case might find use for it in the future
_bit_members[member] = (info["offset"], info["bit"])

elif info["offset"] in _host_members:
_bit_members[member] = (_host_members[info["offset"]][0], info["bit"])
elif info["offset"] in _multibyte_hosts:
_, host_offset, packet_offset = _multibyte_hosts[info["offset"]]
# adjust the bit number by the byte offset within the host member
# but use the packet offset aka host member's offset to map the bit member to the host
_bit_members[member] = (
_host_members[packet_offset][0],
info["bit"] + 8 * host_offset,
)
else:
_struct_members.append((info["type_class"](member), info["offset"]))
if info["data_type_name"] == "BOOL" and 'bit' in info:
# bit members aren't really 'struct' members since they are aliased to bits of other members
_bit_members[member] = (info['offset'], info['bit'])
else:
_struct_members.append((info["type_class"](member), info["offset"]))

if (
if ( # determine if struct is a string or not
data_type["attributes"] == ["LEN", "DATA"]
and data_type["internal_tags"]["DATA"]["data_type_name"] == "SINT"
and data_type["internal_tags"]["DATA"].get("array")
Expand All @@ -866,9 +830,9 @@ def _parse_template_data(self, data, template, symbol_type):
data_type["_struct_members"] = (_struct_members, _bit_members)
data_type["type_class"] = StructTag(
*_struct_members,
bool_members=_bit_members,
bit_members=_bit_members,
struct_size=template["structure_size"],
host_members={m: t for m, t in _host_members.values()},
private_members=_private_members,
)

self.__log.debug(f"Completed parsing template as data type {data_type!r}")
Expand Down

0 comments on commit f847dd6

Please sign in to comment.