Skip to content

Commit

Permalink
Addresses some of the comments in issue #14.
Browse files Browse the repository at this point in the history
- Convert all values received by `Packet.parse_msg` to integers.
- Replace `RadioPacket.sender` and `RadioPacket.destination` with lists of integers. Create corresponding functions for receiving hex and integer values.
- Move commonly used stuff to `enocean.utils`
  • Loading branch information
kipe committed Feb 9, 2016
1 parent 1d4194a commit c2e3922
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 51 deletions.
12 changes: 5 additions & 7 deletions enocean/protocol/eep.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
from bs4 import BeautifulSoup
import logging

import enocean.utils

logger = logging.getLogger('enocean.protocol.eep')

path = os.path.dirname(os.path.realpath(__file__))
Expand All @@ -20,10 +22,6 @@ def __init__(self):
logger.warn('Cannot load protocol file!')
pass

def _get_hex(self, nmb):
''' Get hex-representation of number '''
return '0x%02X' % (nmb)

def _get_raw(self, source, bitarray):
''' Get raw data as integer, based on offset and size '''
offset = int(source['offset'])
Expand Down Expand Up @@ -125,17 +123,17 @@ def find_profile(self, rorg, func, type, direction=None):
logger.warn('EEP.xml not loaded!')
return None

rorg = self.soup.find('telegram', {'rorg': self._get_hex(rorg)})
rorg = self.soup.find('telegram', {'rorg': '0x%s' % enocean.utils._to_hex_string(rorg)})
if not rorg:
logger.warn('Cannot find rorg in EEP!')
return None

func = rorg.find('profiles', {'func': self._get_hex(func)})
func = rorg.find('profiles', {'func': '0x%s' % enocean.utils._to_hex_string(func)})
if not func:
logger.warn('Cannot find func in EEP!')
return None

profile = func.find('profile', {'type': self._get_hex(type)})
profile = func.find('profile', {'type': '0x%s' % enocean.utils._to_hex_string(type)})
if not profile:
logger.warn('Cannot find type in EEP!')
return None
Expand Down
76 changes: 32 additions & 44 deletions enocean/protocol/packet.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from __future__ import print_function, unicode_literals, division
import logging

import enocean.utils
from enocean.protocol import crc8
from enocean.protocol.eep import EEP
from enocean.protocol.constants import PACKET, RORG, PARSE_RESULT, DB0, DB2, DB3
Expand Down Expand Up @@ -45,45 +46,20 @@ def __unicode__(self):
def __eq__(self, other):
return self.type == other.type and self.rorg == other.rorg and self.data == other.data and self.optional == other.optional

def _get_bit(self, byte, bit):
''' Get bit value from byte '''
return (byte >> bit) & 0x01

def _combine_hex(self, data):
''' Combine list of integer values to one big integer '''
output = 0x00
for i, d in enumerate(reversed(data)):
output |= (d << i * 8)
return output

def _to_bitarray(self, data, width=8):
''' Convert data (list of integers, bytearray or integer) to bitarray '''
if isinstance(data, list) or isinstance(data, bytearray):
data = self._combine_hex(data)
return [True if digit == '1' else False for digit in bin(data)[2:].zfill(width)]

def _from_bitarray(self, data):
''' Convert bit array back to integer '''
return int(''.join(['1' if x else '0' for x in data]), 2)

def _to_hex_string(self, data):
''' Convert list of integers to a hex string, separated by ":" '''
return ':'.join([('%02X' % o) for o in data])

def _data_to_bitdata(self):
''' Updates the bit_data member based on data member.'''
if self.rorg == RORG.RPS or self.rorg == RORG.BS1:
self.bit_data = self._to_bitarray(self.data[1], 8)
self.bit_data = enocean.utils._to_bitarray(self.data[1], 8)
if self.rorg == RORG.BS4:
self.bit_data = self._to_bitarray(self.data[1:5], 32)
self.bit_data = enocean.utils._to_bitarray(self.data[1:5], 32)

def _bitdata_to_data(self):
''' Updates the data member based on bit_data member.'''
if self.rorg in [RORG.RPS, RORG.BS1]:
self.data[1] = self._from_bitarray(self.bit_data)
self.data[1] = enocean.utils._from_bitarray(self.bit_data)
if self.rorg == RORG.BS4:
for byte in range(4):
self.data[byte+1] = self._from_bitarray(self.bit_data[byte*8:(byte+1)*8])
self.data[byte+1] = enocean.utils._from_bitarray(self.bit_data[byte*8:(byte+1)*8])

@staticmethod
def parse_msg(buf):
Expand All @@ -101,7 +77,7 @@ def parse_msg(buf):

# Valid buffer starts from 0x55
# Convert to list, as index -method isn't defined for bytearray
buf = buf[list(buf).index(0x55):]
buf = [ord(x) if not isinstance(x, int) else x for x in buf[list(buf).index(0x55):]]
try:
data_len = (buf[1] << 8) | buf[2]
opt_len = buf[3]
Expand Down Expand Up @@ -211,11 +187,11 @@ def parse(self):
self.status = self.data[-1]
if self.rorg == RORG.VLD:
self.status = self.optional[-1]
self.bit_status = self._to_bitarray(self.status)
self.bit_status = enocean.utils._to_bitarray(self.status)

if self.rorg in [RORG.RPS, RORG.BS1, RORG.BS4]:
# These message types should have repeater count in the last for bits of status.
self.repeater_count = self._from_bitarray(self.bit_status[4:])
self.repeater_count = enocean.utils._from_bitarray(self.bit_status[4:])
return self.parsed

def select_eep(self, func, type, direction=None):
Expand All @@ -242,7 +218,7 @@ def set_eep(self, data):
self._data_to_bitdata()
# data is a dict with EEP description keys
self.bit_data, self.bit_status = self.eep.set_values(self._profile, self.bit_data, self.bit_status, data)
self.status = self._from_bitarray(self.bit_status)
self.status = enocean.utils._from_bitarray(self.bit_status)
# update data based on bit_data
self._bitdata_to_data()

Expand All @@ -258,11 +234,9 @@ def build(self):


class RadioPacket(Packet):
destination = 0
destination_hex = ''
destination = [0xFF, 0xFF, 0xFF, 0xFF]
dBm = 0
sender = 0
sender_hex = ''
sender = [0xFF, 0xFF, 0xFF, 0xFF]
learn = True
contains_eep = False

Expand All @@ -277,12 +251,26 @@ def create(rorg, func, type, direction=None,
learn=False, **kwargs):
return Packet.create(PACKET.RADIO, rorg, func, type, direction, destination, sender, learn, **kwargs)

@property
def sender_int(self):
return enocean.utils._combine_hex(self.sender)

@property
def sender_hex(self):
return enocean.utils._to_hex_string(self.sender)

@property
def destination_int(self):
return enocean.utils._combine_hex(self.destination)

@property
def destination_hex(self):
return enocean.utils._to_hex_string(self.destination)

def parse(self):
self.destination = self._combine_hex(self.optional[1:5])
self.destination_hex = self._to_hex_string(self.optional[1:5])
self.destination = self.optional[1:5]
self.dBm = -self.optional[5]
self.sender = self._combine_hex(self.data[-5:-1])
self.sender_hex = self._to_hex_string(self.data[-5:-1])
self.sender = self.data[-5:-1]
# Default to learn == True, as some devices don't have a learn button
self.learn = True

Expand All @@ -298,9 +286,9 @@ def parse(self):
self.contains_eep = self.bit_data[DB0.BIT_7]
if self.contains_eep:
# Get rorg_func and rorg_type from an unidirectional learn packet
self.rorg_func = self._from_bitarray(self.bit_data[DB3.BIT_7:DB3.BIT_1])
self.rorg_type = self._from_bitarray(self.bit_data[DB3.BIT_1:DB2.BIT_2])
self.rorg_manufacturer = self._from_bitarray(self.bit_data[DB2.BIT_2:DB0.BIT_7])
self.rorg_func = enocean.utils._from_bitarray(self.bit_data[DB3.BIT_7:DB3.BIT_1])
self.rorg_type = enocean.utils._from_bitarray(self.bit_data[DB3.BIT_1:DB2.BIT_2])
self.rorg_manufacturer = enocean.utils._from_bitarray(self.bit_data[DB2.BIT_2:DB0.BIT_7])
logger.debug('learn received, EEP detected, RORG: 0x%02X, FUNC: 0x%02X, TYPE: 0x%02X, Manufacturer: 0x%02X' % (self.rorg, self.rorg_func, self.rorg_type, self.rorg_manufacturer))

return super(RadioPacket, self).parse()
Expand Down
34 changes: 34 additions & 0 deletions enocean/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# -*- encoding: utf-8 -*-
from __future__ import print_function, unicode_literals, division


def _get_bit(byte, bit):
''' Get bit value from byte '''
return (byte >> bit) & 0x01


def _combine_hex(data):
''' Combine list of integer values to one big integer '''
output = 0x00
for i, d in enumerate(reversed(data)):
output |= (d << i * 8)
return output


def _to_bitarray(data, width=8):
''' Convert data (list of integers, bytearray or integer) to bitarray '''
if isinstance(data, list) or isinstance(data, bytearray):
data = _combine_hex(data)
return [True if digit == '1' else False for digit in bin(data)[2:].zfill(width)]


def _from_bitarray(data):
''' Convert bit array back to integer '''
return int(''.join(['1' if x else '0' for x in data]), 2)


def _to_hex_string(data):
''' Convert list of integers to a hex string, separated by ":" '''
if isinstance(data, int):
return '%02X' % data
return ':'.join([('%02X' % o) for o in data])
2 changes: 2 additions & 0 deletions tests/test_eep.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ def test_temperature():
assert p.rorg_type is 0x05
assert p.status == 0x00
assert p.repeater_count == 0
assert p.sender == [0x01, 0x81, 0xB7, 0x44]
assert p.sender_hex == '01:81:B7:44'


def test_magnetic_switch():
Expand Down

2 comments on commit c2e3922

@romor
Copy link
Contributor

@romor romor commented on c2e3922 Feb 9, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a minor comment: the functions in utils.py are no more private and should not start with underscores anymore

@kipe
Copy link
Owner Author

@kipe kipe commented on c2e3922 Feb 9, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, fixing...

Please sign in to comment.