Skip to content

Commit

Permalink
✅ added tests for AOIs
Browse files Browse the repository at this point in the history
  • Loading branch information
ottowayi committed Nov 2, 2021
1 parent 431f160 commit ab8f92e
Show file tree
Hide file tree
Showing 5 changed files with 1,754 additions and 158 deletions.
3 changes: 3 additions & 0 deletions pycomm3/logix_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -1419,6 +1419,9 @@ def _send_read_fragmented(
self._sequence, request, offset
)
else:
if response.error:
self.__log.error(f"Fragment failed with error: {response.error}")

offset = None

if all(responses):
Expand Down
34 changes: 9 additions & 25 deletions pycomm3/packets/logix.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,7 @@ def __init__(
self.request_path = None

def tag_only_message(self):
return b"".join(
(self.tag_service, self.request_path, UINT.encode(self.elements))
)
return b"".join((self.tag_service, self.request_path, UINT.encode(self.elements)))


class ReadTagResponsePacket(TagServiceResponsePacket):
Expand Down Expand Up @@ -107,20 +105,16 @@ class ReadTagRequestPacket(TagServiceRequestPacket):
def _setup_message(self):
super()._setup_message()
if self.request_path is None:
self.request_path = tag_request_path(
self.tag, self.tag_info, self._use_instance_id
)
self.request_path = tag_request_path(self.tag, self.tag_info, self._use_instance_id)
if self.request_path is None:
self.error = f"Failed to build request path for tag"
self._error = "Failed to build request path for tag"
self._msg.append(self.tag_only_message())


class ReadTagFragmentedResponsePacket(ReadTagResponsePacket):
__log = logging.getLogger(f"{__module__}.{__qualname__}")

def __init__(
self, request: "ReadTagFragmentedRequestPacket", raw_data: bytes = None
):
def __init__(self, request: "ReadTagFragmentedRequestPacket", raw_data: bytes = None):
self.value = None
self._data_type = None
self.value_bytes = None
Expand Down Expand Up @@ -201,9 +195,7 @@ def from_request(
return new_request

def __repr__(self):
return (
f"{self.__class__.__name__}(tag={self.tag!r}, elements={self.elements!r})"
)
return f"{self.__class__.__name__}(tag={self.tag!r}, elements={self.elements!r})"


class WriteTagResponsePacket(TagServiceResponsePacket):
Expand Down Expand Up @@ -251,9 +243,7 @@ def __init__(
def _setup_message(self):
super()._setup_message()
if self.request_path is None:
self.request_path = tag_request_path(
self.tag, self.tag_info, self._use_instance_id
)
self.request_path = tag_request_path(self.tag, self.tag_info, self._use_instance_id)
if self.request_path is None:
self.error = f"Failed to build request path for tag"
self._msg.append(self.tag_only_message())
Expand Down Expand Up @@ -368,9 +358,7 @@ def __init__(
self._mask_size = DataTypes.get(self.data_type).size

if self._mask_size is None:
raise RequestError(
f'Invalid data type {tag_info["data_type"]} for writing bits'
)
raise RequestError(f'Invalid data type {tag_info["data_type"]} for writing bits')

if self.request_path is None:
self.error = "Failed to create request path for tag"
Expand Down Expand Up @@ -414,9 +402,7 @@ def _parse_reply(self):
super()._parse_reply()
num_replies = UINT.decode(self.data)
offset_data = self.data[2 : 2 + 2 * num_replies]
offsets = (
UINT.decode(offset_data[i : i + 2]) for i in range(0, len(offset_data), 2)
)
offsets = (UINT.decode(offset_data[i : i + 2]) for i in range(0, len(offset_data), 2))
start, end = tee(offsets) # split offsets into start/end indexes
next(end) # advance end by 1 so 2nd item is the end index for the first item
reply_data = [self.data[i:j] for i, j in zip_longest(start, end)]
Expand All @@ -429,9 +415,7 @@ def _parse_reply(self):
self.responses.append(response)

def __repr__(self):
return (
f"{self.__class__.__name__}(values={_r(self.values)}, error={self.error!r})"
)
return f"{self.__class__.__name__}(values={_r(self.values)}, error={self.error!r})"


class MultiServiceRequestPacket(SendUnitDataRequestPacket):
Expand Down
32 changes: 32 additions & 0 deletions tests/online/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,26 @@
"udt_ary3": [_udt3_values_empty, _udt3_values_empty, _udt3_values],
}

_aoi1_values = {
"EnableIn": True,
"EnableOut": True,
"b1": False,
"b2": True,
**{f"b{i}": False for i in range(3, 21)},
"param_dint1": 1234,
"local_dint1": 0,
"local_udt1": {
"udt1": _udt1_values_empty,
"udt_ary1": [_udt1_values_empty for _ in range(5)],
"udt2": _udt2_values_empty,
"udt_ary2": [_udt2_values_empty for _ in range(5)],
"str1": "",
"str_ary1": ["", "", "", "", ""],
"udt3": _udt3_values_empty,
"udt_ary3": [_udt3_values_empty for _ in range(3)],
},
}

BASE_STRUCT_TESTS = [
# struct of just atomic values
("_udt1", "pycomm3_AtomicUDT", _udt1_values),
Expand Down Expand Up @@ -262,4 +282,16 @@
("_str480_ary1[0]{4}", "STRING480[4]", _str480_ary1_values),
("_str480_ary1[3]", "STRING480", _str480_ary1_values[3]),
("_str480_ary1[1]{2}", "STRING480[2]", _str480_ary1_values[1:3]),
# aoi
("_aoi1", "pycomm3_AOI", _aoi1_values),
("_aoi1.b1", "BOOL", _aoi1_values["b1"]),
("_aoi1.b2", "BOOL", _aoi1_values["b2"]),
("_aoi1.b16", "BOOL", _aoi1_values["b16"]),
("_aoi1.b17", "BOOL", _aoi1_values["b17"]),
("_aoi1.b20", "BOOL", _aoi1_values["b20"]),
("_aoi1.param_dint1", "DINT", _aoi1_values["param_dint1"]),
("_aoi1.param_dint1.1", "BOOL", True),
("_aoi1.param_dint1.31", "BOOL", False),
("_aoi1.local_dint1", "DINT", _aoi1_values["local_dint1"]),
("_aoi1.local_udt1", "pycomm3_NestedUDT", _aoi1_values["local_udt1"]),
]
37 changes: 21 additions & 16 deletions tests/online/test_writes.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,16 @@


all_write_tests = [
*[(f'write{tag}', dt, val) for tag, dt, val in BASE_ATOMIC_TESTS],
*[(f'Program:pycomm3.write_prog{tag}', dt, val) for tag, dt, val in BASE_ATOMIC_TESTS],
*[(f'write{tag}', dt, val) for tag, dt, val in BASE_ATOMIC_ARRAY_TESTS],
*[(f'Program:pycomm3.write_prog{tag}', dt, val) for tag, dt, val in BASE_ATOMIC_ARRAY_TESTS],
*[(f'write{tag}', dt, val) for tag, dt, val in BASE_STRUCT_TESTS],
*[(f'Program:pycomm3.write_prog{tag}', dt, val) for tag, dt, val in BASE_STRUCT_TESTS],
*[(f"write{tag}", dt, val) for tag, dt, val in BASE_ATOMIC_TESTS if "aoi" not in tag],
*[(f"Program:pycomm3.write_prog{tag}", dt, val) for tag, dt, val in BASE_ATOMIC_TESTS if "aoi" not in tag],
*[(f"write{tag}", dt, val) for tag, dt, val in BASE_ATOMIC_ARRAY_TESTS if "aoi" not in tag],
*[(f"Program:pycomm3.write_prog{tag}", dt, val) for tag, dt, val in BASE_ATOMIC_ARRAY_TESTS if "aoi" not in tag],
*[(f"write{tag}", dt, val) for tag, dt, val in BASE_STRUCT_TESTS if "aoi" not in tag],
*[(f"Program:pycomm3.write_prog{tag}", dt, val) for tag, dt, val in BASE_STRUCT_TESTS if "aoi" not in tag],
]


@pytest.mark.parametrize('tag_name, data_type, value', all_write_tests)
@pytest.mark.parametrize("tag_name, data_type, value", all_write_tests)
def test_writes(plc, tag_name, data_type, value):
result = plc.write((tag_name, value))
assert result
Expand All @@ -26,11 +26,14 @@ def test_writes(plc, tag_name, data_type, value):
assert result == plc.read(tag_name) # read the same tag and make sure it matches


@pytest.mark.parametrize('tag_name, data_type, value', (
('write_bool_ary1[0]{32}', 'BOOL[32]', _bool_array[:32]),
('write_bool_ary1[32]{32}', 'BOOL[32]', _bool_array[32:64]),
('write_bool_ary1[32]{64}', 'BOOL[64]', _bool_array[32:]),
))
@pytest.mark.parametrize(
"tag_name, data_type, value",
(
("write_bool_ary1[0]{32}", "BOOL[32]", _bool_array[:32]),
("write_bool_ary1[32]{32}", "BOOL[32]", _bool_array[32:64]),
("write_bool_ary1[32]{64}", "BOOL[64]", _bool_array[32:]),
),
)
def test_bool_array_writes(plc, tag_name, data_type, value):
result = plc.write((tag_name, value))
assert result
Expand All @@ -42,7 +45,7 @@ def test_bool_array_writes(plc, tag_name, data_type, value):


def test_bool_array_invalid_writes(plc):
result = plc.write('write_bool_ary1[1]{2}', [True, False])
result = plc.write("write_bool_ary1[1]{2}", [True, False])
assert not result


Expand Down Expand Up @@ -99,9 +102,11 @@ def test_multi_write(plc):

def test_duplicate_tags_in_request(plc):
tags = [
('write_int_max.0', True), ('write_int_max.1', False),
('write_int_max', 32_767), ('write_int_min', -32_768),
('write_bool_ary1[1]', False)
("write_int_max.0", True),
("write_int_max.1", False),
("write_int_max", 32_767),
("write_int_min", -32_768),
("write_bool_ary1[1]", False),
]

results = plc.write(*tags, *tags)
Expand Down
Loading

0 comments on commit ab8f92e

Please sign in to comment.