Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add locking support when accessing the GPIO pins #10

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions energenie/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from __future__ import absolute_import
from .energenie import switch_on, switch_off
from .energenie import switch, switch_on, switch_off


__version__ = '1.0.1'
__version__ = '1.1.1'
113 changes: 71 additions & 42 deletions energenie/energenie.py
Original file line number Diff line number Diff line change
@@ -1,58 +1,87 @@
import RPi.GPIO as GPIO
from time import sleep
from fcntl import flock, LOCK_EX

# The GPIO pins for the Energenie module
BIT1 = 17
BIT2 = 22
BIT3 = 23
BIT4 = 27

ON_OFF_KEY = 24
# Set default transmission duration of radio signal in seconds.
# Setting to None or a value less than 0.25 will result in the code
# using the minimum sane value of 0.25 seconds which is the shortest
# possible time that one control frame can be broadcast in.
#
# Increasing this value may be beneficial if noise is anticipated
# in the radio spectrum or a large distance between the transmitter
# and sockets is expected because the transmitter will remain in a
# constant loop broadcasting the same control frame over and over
# again for the duration it is enabled. This can increase the chances
# of sockets receiving a valid recognisable control frame, which has
# not been corrupted, that can subsequently be acted upon.
#
DURATION = None


# Define GPIO pins for the Energenie module.
#
D0 = 17
D1 = 22
D2 = 23
D3 = 27
MODSEL = 24
ENABLE = 25

# Assign all of the above into a master list. Place D3, D2, D1 and D0
# first, in that order, for use with enumeration later on.
#
ALL = [D3, D2, D1, D0, MODSEL, ENABLE]


# Codes for switching ON (True) and OFF (False) the sockets.
#
# Group: ALL 1 2 3 4
# D0-D3: 3210 3210 3210 3210 3210
#
CODES = {
True: ['1011', '1111', '1110', '1101', '1100'], # ON
False: ['0011', '0111', '0110', '0101', '0100'], # OFF
}


GPIO.setmode(GPIO.BCM)
GPIO.setwarnings(False)

GPIO.setup(BIT1, GPIO.OUT)
GPIO.setup(BIT2, GPIO.OUT)
GPIO.setup(BIT3, GPIO.OUT)
GPIO.setup(BIT4, GPIO.OUT)

GPIO.setup(ON_OFF_KEY, GPIO.OUT)
GPIO.setup(ENABLE, GPIO.OUT)

GPIO.output(ON_OFF_KEY, False)
GPIO.output(ENABLE, False)

GPIO.output(BIT1, False)
GPIO.output(BIT2, False)
GPIO.output(BIT3, False)
GPIO.output(BIT4, False)

# Codes for switching on and off the sockets
# all 1 2 3 4
ON = ['1011', '1111', '1110', '1101', '1100']
OFF = ['0011', '0111', '0110', '0101', '0100']


def change_plug_state(socket, on_or_off):
state = on_or_off[socket][3] == '1'
GPIO.output(BIT1, state)
state = on_or_off[socket][2] == '1'
GPIO.output(BIT2, state)
state = on_or_off[socket][1] == '1'
GPIO.output(BIT3, state)
state = on_or_off[socket][0] == '1'
GPIO.output(BIT4, state)

def reset():
map(lambda p: GPIO.setup(p, GPIO.OUT), ALL)
map(lambda p: GPIO.output(p, False), ALL)
sleep(0.1)


def broadcast(state, socket, duration):
duration = max(duration, 0.25)
lock = open('/run/GPIO.lock', 'a')
flock(lock, LOCK_EX)
reset()
try:
map(lambda (i, p): GPIO.output(p, CODES[state][socket][i] == '1'),
enumerate(ALL[:4])
)
except:
lock.close()
return
sleep(0.1)
GPIO.output(ENABLE, True)
sleep(0.25)
sleep(duration)
GPIO.output(ENABLE, False)
reset()
lock.close()


def switch(state, socket=0, duration=DURATION):
broadcast(bool(state), socket, duration)


def switch_on(socket=0):
change_plug_state(socket, ON)
def switch_on(socket=0, duration=DURATION):
switch(True, socket, duration)


def switch_off(socket=0):
change_plug_state(socket, OFF)
def switch_off(socket=0, duration=DURATION):
switch(False, socket, duration)