Skip to content

Commit

Permalink
Thread-Safe Processing
Browse files Browse the repository at this point in the history
Make code thread-safe
  • Loading branch information
Jon authored and Jon committed Mar 12, 2018
1 parent f461c9b commit 593b837
Show file tree
Hide file tree
Showing 4 changed files with 264 additions and 266 deletions.
2 changes: 1 addition & 1 deletion Alexa-Hue Bridge.indigoPlugin/Contents/Info.plist
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<plist version="1.0">
<dict>
<key>PluginVersion</key>
<string>3.0.26</string>
<string>3.0.27</string>
<key>ServerApiVersion</key>
<string>2.0</string>
<key>IwsApiVersion</key>
Expand Down
85 changes: 43 additions & 42 deletions Alexa-Hue Bridge.indigoPlugin/Contents/Server Plugin/discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,35 +73,35 @@ def __init__(self, plugin, ahbDevId):

self.interrupted = False

broadcast_data = {"broadcast_ip": BCAST_IP,
"upnp_port": UPNP_PORT,
"server_ip": PLUGIN.globals['alexaHueBridge'][self.ahbDevId]['host'],
"server_port": PLUGIN.globals['alexaHueBridge'][self.ahbDevId]['port'],
"uuid": PLUGIN.globals['alexaHueBridge'][self.ahbDevId]['uuid']}
self.broadcast_packet = broadcast_packet % broadcast_data
self.broadcast_data = {"broadcast_ip": BCAST_IP,
"upnp_port": UPNP_PORT,
"server_ip": PLUGIN.globals['alexaHueBridge'][self.ahbDevId]['host'],
"server_port": PLUGIN.globals['alexaHueBridge'][self.ahbDevId]['port'],
"uuid": PLUGIN.globals['alexaHueBridge'][self.ahbDevId]['uuid']}
self.broadcast_packet = broadcast_packet % self.broadcast_data
except StandardError, e:
PLUGIN.broadcasterLogger.error(u"StandardError detected in Broadcaster.Init for '{}'. Line '{}' has error='{}'".format(indigo.devices[ahbDevId].name, sys.exc_traceback.tb_lineno, e))

def run(self):
try:
PLUGIN.broadcasterLogger.debug("Broadcaster.run called")
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 20)
start_time = time.time()
end_time = start_time + (PLUGIN.globals['alexaHueBridge'][self.ahbDevId]['discoveryExpiration'] * 60)
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
self.sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 20)
self.start_time = time.time()
self.end_time = self.start_time + (PLUGIN.globals['alexaHueBridge'][self.ahbDevId]['discoveryExpiration'] * 60)
PLUGIN.broadcasterLogger.debug("Broadcaster.run: sending first broadcast:\n{}".format(self.broadcast_packet))
while True:
sock.sendto(self.broadcast_packet, (BCAST_IP, UPNP_PORT))
self.sock.sendto(self.broadcast_packet, (BCAST_IP, UPNP_PORT))
for x in range(BROADCAST_INTERVAL):
time.sleep(1.5)
# Following code will only time out the Broadcaster Thread if PLUGIN.globals['alexaHueBridge'][self.ahbDevId]['discoveryExpiration'] > 0 (valid values 0 thru 10 inclusive)
# A value of zero means 'always on'
if PLUGIN.globals['alexaHueBridge'][self.ahbDevId]['discoveryExpiration'] and time.time() > end_time:
if PLUGIN.globals['alexaHueBridge'][self.ahbDevId]['discoveryExpiration'] and time.time() > self.end_time:
PLUGIN.broadcasterLogger.debug("Broadcaster thread timed out")
self.stop()
if self.interrupted:
PLUGIN.setDeviceDiscoveryState(False, self.ahbDevId)
sock.close()
self.sock.close()
return

except StandardError, e:
Expand All @@ -122,58 +122,59 @@ def __init__(self, plugin, ahbDevId):

try:
self.ahbDevId = ahbDevId
self.ahbName = indigo.devices[self.ahbDevId].name

PLUGIN.responderLogger.debug("Responder.__init__ for '{}' is running".format(PLUGIN.globals['alexaHueBridge'][self.ahbDevId]['hubName']))

self.interrupted = False

response_data = {"server_ip": PLUGIN.globals['alexaHueBridge'][self.ahbDevId]['host'],
"server_port": PLUGIN.globals['alexaHueBridge'][self.ahbDevId]['port'],
"uuid": PLUGIN.globals['alexaHueBridge'][self.ahbDevId]['uuid']}
self.response_packet = response_packet % response_data
self.response_data = {"server_ip": PLUGIN.globals['alexaHueBridge'][self.ahbDevId]['host'],
"server_port": PLUGIN.globals['alexaHueBridge'][self.ahbDevId]['port'],
"uuid": PLUGIN.globals['alexaHueBridge'][self.ahbDevId]['uuid']}
self.response_packet = response_packet % self.response_data
except StandardError, e:
PLUGIN.responderLogger.error(u"StandardError detected in Responder.Init for '{}'. Line '{}' has error='{}'".format(indigo.devices[ahbDevId].name, sys.exc_traceback.tb_lineno, e))

def run(self):
try:
PLUGIN.responderLogger.debug("Responder.run called")
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
try:
sock.bind(('', UPNP_PORT))
sock.setsockopt(socket.IPPROTO_IP,
self.sock.bind(('', UPNP_PORT))
self.sock.setsockopt(socket.IPPROTO_IP,
socket.IP_ADD_MEMBERSHIP,
socket.inet_aton(BCAST_IP) + socket.inet_aton(PLUGIN.globals['alexaHueBridge'][self.ahbDevId]['host']))
sock.settimeout(1)
start_time = time.time()
end_time = start_time + (PLUGIN.globals['alexaHueBridge'][self.ahbDevId]['discoveryExpiration'] * 60)
self.sock.settimeout(1)
self.start_time = time.time()
self.end_time = self.start_time + (PLUGIN.globals['alexaHueBridge'][self.ahbDevId]['discoveryExpiration'] * 60)
while True:
try:
data, addr = sock.recvfrom(1024)
self.data, self.addr = self.sock.recvfrom(1024)

# Following code discards non-Echo network traffic if option set in plugin config
if PLUGIN.globals['amazonEchoDeviceFilterActive']:
if addr[0] not in PLUGIN.globals['amazonEchoDevices'].values():
if self.addr[0] not in PLUGIN.globals['amazonEchoDevices']:
if PLUGIN.globals['debug']['filter']:
PLUGIN.responderLogger.debug("Responder.respond called from SKIPPED address '{}'".format(str(addr[0])))
PLUGIN.responderLogger.debug("Responder.respond called from SKIPPED address '{}'".format(str(self.addr[0])))
continue

# Following code will only time out the Broadcaster Thread if PLUGIN.globals['alexaHueBridge'][self.ahbDevId]['discoveryExpiration'] > 0 (valid values 0 thru 10 inclusive)
# A value of zero means 'always on'
if PLUGIN.globals['alexaHueBridge'][self.ahbDevId]['discoveryExpiration'] and time.time() > end_time:
if PLUGIN.globals['alexaHueBridge'][self.ahbDevId]['discoveryExpiration'] and time.time() > self.end_time:
PLUGIN.responderLogger.debug("Responder.run thread timed out")
self.stop()
raise socket.error
except socket.error:
if self.interrupted:
PLUGIN.responderLogger.debug("Responder.run: self.interrupted: True")
PLUGIN.setDeviceDiscoveryState(False, self.ahbDevId)
sock.close()
self.sock.close()
return
else:
if M_SEARCH_REQ_MATCH in data:
PLUGIN.responderLogger.debug("Responder.run: received: {}".format(str(data)))
self.respond(addr)
if M_SEARCH_REQ_MATCH in self.data:
PLUGIN.responderLogger.debug("Responder.run: received from {}: {}".format(self.addr, str(self.data)))
self.respond(self.addr)
except socket.error as e:
# This is the exception thrown when someone else has bound to the UPNP port, so write some errors and
# stop the thread (which really isn't needed, but it logs a nice stop debug message).
Expand All @@ -197,13 +198,13 @@ def stop(self):
PLUGIN.responderLogger.debug("Responder thread stopped")
self.interrupted = True

def respond(self, addr):
time.sleep(1.5)
PLUGIN.responderLogger.debug("Responder.respond called from address {}\n{}".format(str(addr), self.response_packet))
PLUGIN.responderLogger.debug("Responder.respond: creating output_socket")
output_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
PLUGIN.responderLogger.debug("Responder.respond: calling output_socket.sendto")
output_socket.sendto(self.response_packet, addr)
PLUGIN.responderLogger.debug("Responder.respond: closing output_socket")
output_socket.close()
PLUGIN.responderLogger.debug("Responder.respond: UDP Response sent to {}".format(str(addr)))
def respond(self, p1_addr, ):
#time.sleep(1.5)
PLUGIN.responderLogger.debug("Responder.respond for {}, called from address {}\n{}".format(self.ahbName, str(p1_addr), self.response_packet))
PLUGIN.responderLogger.debug("Responder.respond: for {}, creating output_socket".format(self.ahbName))
self.output_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
PLUGIN.responderLogger.debug("Responder.respond: for {}, calling output_socket.sendto".format(self.ahbName))
self.output_socket.sendto(self.response_packet, p1_addr)
PLUGIN.responderLogger.debug("Responder.respond: for {}, closing output_socket".format(self.ahbName))
self.output_socket.close()
PLUGIN.responderLogger.debug("Responder.respond: for {}, UDP Response sent to {}".format(self.ahbName, str(p1_addr)))
Loading

0 comments on commit 593b837

Please sign in to comment.