From b047a183d911db6bd2b3f9ee723870423e07aa34 Mon Sep 17 00:00:00 2001 From: Heinrich Grattenthaler Date: Fri, 27 Sep 2024 17:29:21 +0200 Subject: [PATCH] add support for set_attributes_all (0x02) --- server/enip/client.py | 35 ++++++++++++++++- server/enip/device.py | 75 ++++++++++++++++++++++++++++++++++++ server/enip/get_attribute.py | 3 +- 3 files changed, 109 insertions(+), 4 deletions(-) diff --git a/server/enip/client.py b/server/enip/client.py index 36383ca9..28de9217 100644 --- a/server/enip/client.py +++ b/server/enip/client.py @@ -147,7 +147,7 @@ def parse_context( sender_context ): # def int_validate( x, lo, hi ): - res = int( x ) + res = int( x, base = 0) assert lo <= res <= hi, "Invalid %d; not in range (%d,%d)" % ( res, lo, hi) return res @@ -857,6 +857,32 @@ def get_attributes_all( self, path, sender_context=sender_context, **kwds ) return req + def set_attributes_all( self, path, data, elements=1, tag_type=None, + route_path=None, send_path=None, timeout=None, send=True, + sender_context=b'', **kwds): + + if elements is None: + elements = len( data ) + else: + assert elements == len( data ), \ + "Inconsistent elements: %d doesn't match data length: %d" % ( elements, len( data )) + + req = dotdict() + req.path = { 'segment': [ + dotdict( d ) for d in device.parse_path( path ) + ]} + req.set_attributes_all= { + 'data': data, + 'elements': elements, + } + + if send: + self.req_send( + request=req, route_path=route_path, send_path=send_path, timeout=timeout, + sender_context=sender_context, **kwds ) + + return req + def get_attribute_single( self, path, route_path=None, send_path=None, timeout=None, send=True, data_size=None, elements=None, tag_type=None, # for response data_size estimation @@ -1373,6 +1399,11 @@ def issue( self, operations, index=0, fragment=False, multiple=0, timeout=None ) tag_type=op.get( 'tag_type' ) or parser.DINT.tag_type, size=op.get( 'elements', 1 )) else: rpyest = multiple + elif method == "set_attributes_all": + descr += "S_A_A" + req = self.set_attributes_all( timeout=timeout, send=not multiple, **op ) + reqest = 8 + parser.typed_data.datasize( parser.USINT.tag_type, size=len( op['data'] )) + rpyest = 4 elif method == "service_code": req = self.service_code( timeout=timeout, send=not multiple, **op ) reqest = 1 + len( req.input ) # We've rendered the Service Request payload @@ -1504,7 +1535,7 @@ def collect( self, timeout=None ): elif reply.status in (0x00,0x06) and 'get_attributes_all' in reply: val = reply.get_attributes_all.data elif reply.status in (0x00,): - # eg. 'set_attribute_single', 'write_{tag,frag}', 'service_code', etc... + # eg. 'set_attribute_single', 'set_attributes_all', 'write_{tag,frag}', 'service_code', etc... val = True else: # Failure; val is Falsey if 'status_ext' in reply and reply.status_ext.size: diff --git a/server/enip/device.py b/server/enip/device.py index dce53eac..8d9aa900 100644 --- a/server/enip/device.py +++ b/server/enip/device.py @@ -920,6 +920,11 @@ def register_service_parser( cls, number, name, short, machine ): GA_ALL_REQ = 0x01 GA_ALL_RPY = GA_ALL_REQ | 0x80 + SA_ALL_NAM = "Set Attributes All" + SA_ALL_CTX = "set_attributes_all" + SA_ALL_REQ = 0x02 + SA_ALL_RPY = SA_ALL_REQ | 0x80 + GA_LST_NAM = "Get Attribute List" GA_LST_CTX = "get_attribute_list" GA_LST_REQ = 0x03 @@ -1062,6 +1067,9 @@ def request( self, data, addr=None ): elif ( data.get( 'service' ) == self.GA_ALL_REQ or self.GA_ALL_CTX in data and data.setdefault( 'service', self.GA_ALL_REQ ) == self.GA_ALL_REQ ): pass + elif ( data.get( 'service' ) == self.SA_ALL_REQ + or self.SA_ALL_CTX in data and data.setdefault( 'service', self.SA_ALL_REQ ) == self.SA_ALL_REQ ): + pass elif ( data.get( 'service' ) == self.SA_SNG_REQ or self.SA_SNG_CTX in data and data.setdefault( 'service', self.SA_SNG_REQ ) == self.SA_SNG_REQ ): pass @@ -1086,6 +1094,38 @@ def request( self, data, addr=None ): data.get_attributes_all = dotdict() data.get_attributes_all.data = [ b if type( b ) is int else ord( b ) for b in result ] + elif data.service == self.SA_ALL_RPY: + # Set Attributes All. Convert unsigned ints to bytes, parse appropriate + # elements using the Attribute's .parser, and assign. Must produce exactly the + # correct number of bytes to fully populate the Attributes. + + a_id = 1 + sum_siz = 0 + while str(a_id) in self.attribute: + + att = self.attribute[str(a_id)] + att_siz = att.parser.struct_calcsize * len(att) + sum_siz += att_siz + a_id += 1 + + assert 'set_attributes_all.data' in data and len(data.set_attributes_all.data) == sum_siz, \ + "Expected %d total bytes in .set_attributes_all.data" % (sum_siz,) + + a_id = 1 + data_ptr = 0 + while str(a_id) in self.attribute: + + att = self.attribute[str(a_id)] + siz = att.parser.struct_calcsize + att_siz = siz * len(att) + fmt = att.parser.struct_format + buf = bytearray( data.set_attributes_all.data[data_ptr:data_ptr + att_siz] ) + val = [struct.unpack( fmt, buf[i:i+siz] )[0] + for i in range(0, len(buf), siz) ] + att[:] = val + a_id += 1 + data_ptr += att_siz + elif data.service == self.GA_LST_RPY: # Get Attribute List. Collect up the bytes representing the attributes. Converts a # placehold .get_attribute_list = [,...] list of attribute numbers with @@ -1217,6 +1257,12 @@ def produce( cls, data ): # Get Attributes All result += USINT.produce( data.service ) result += EPATH.produce( data.path ) + elif cls.SA_ALL_CTX in data and data.setdefault( 'service', cls.SA_ALL_REQ ) == cls.SA_ALL_REQ: + # Set Attributes All + result += USINT.produce( data.service ) + result += EPATH.produce( data.path ) + result += typed_data.produce( data.set_attributes_all, + tag_type=USINT.tag_type ) elif cls.GA_SNG_CTX in data and data.setdefault( 'service', cls.GA_SNG_REQ ) == cls.GA_SNG_REQ: # Get Attribute Single result += USINT.produce( data.service ) @@ -1242,6 +1288,12 @@ def produce( cls, data ): if data.status == 0x00: result += typed_data.produce( data.get_attributes_all, tag_type=USINT.tag_type ) + elif data.get( 'service' ) == cls.SA_ALL_RPY: + # Set Attributes All Reply. + result += USINT.produce( data.service ) + result += b'\x00' # reserved + result += status.produce( data ) + elif data.get( 'service' ) == cls.GA_LST_RPY: # Get Attribute List Reply result += USINT.produce( data.service ) @@ -1328,6 +1380,29 @@ def __get_attributes_all_reply(): Object.register_service_parser( number=Object.GA_ALL_RPY, name=Object.GA_ALL_NAM + " Reply", short=Object.GA_ALL_CTX, machine=__get_attributes_all_reply() ) +def __set_attributes_all( ): + srvc = USINT( context='service' ) + srvc[True] = path = EPATH( context='path') + path[True] = typed_data( context=Object.SA_ALL_CTX, + tag_type=USINT.tag_type, + terminal=True ) + return srvc + +Object.register_service_parser( number=Object.SA_ALL_REQ, name=Object.SA_ALL_NAM, + short=Object.SA_ALL_CTX, machine=__set_attributes_all() ) + +def __set_attributes_all_reply(): + srvc = USINT( context='service' ) + srvc[True] = rsvd = octets_drop( 'reserved', repeat=1 ) + rsvd[True] = stts = status() + stts[None] = mark = octets_noop( context=Object.SA_ALL_CTX, + terminal=True ) + mark.initial[None] = move_if( 'mark', initializer=True ) + return srvc + +Object.register_service_parser( number=Object.SA_ALL_RPY, name=Object.SA_ALL_NAM + " Reply", + short=Object.SA_ALL_CTX, machine=__set_attributes_all_reply() ) + def __get_attribute_list(): srvc = USINT( context='service' ) srvc[True] = path = EPATH( context='path') diff --git a/server/enip/get_attribute.py b/server/enip/get_attribute.py index 31511422..7ee7d698 100755 --- a/server/enip/get_attribute.py +++ b/server/enip/get_attribute.py @@ -101,8 +101,7 @@ def attribute_operations( paths, int_type=None, **kwds ): for op in client.parse_operations( paths, int_type=int_type or 'SINT', **kwds ): path_end = op['path'][-1] if 'instance' in path_end: - op['method'] = 'get_attributes_all' - assert 'data' not in op, "All Attributes cannot be operated on using Set Attribute services" + op['method'] = 'set_attributes_all' if 'data' in op else 'get_attributes_all' elif 'symbolic' in path_end or 'attribute' in path_end or 'element' in path_end: op['method'] = 'set_attribute_single' if 'data' in op else 'get_attribute_single' else: