-
Notifications
You must be signed in to change notification settings - Fork 0
/
building_data_requests.py
106 lines (71 loc) · 2.67 KB
/
building_data_requests.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# Copyright 2018 Building Energy Gateway. All rights reserved.
import requests
import json
PUBLIC_HOSTNAME = 'energize.andoverma.us'
INTERNAL_PORT = '8000'
# Request present value and units for the supplied instance
def get_value( facility, instance, gateway_hostname=None, gateway_port=None, live=False ):
value = None
units = None
# Ensure that instance is either a positive integer or non-empty string
if ( str( instance ).isdigit() and ( int( instance ) > 0 ) ) or ( not str( instance ).isdigit() and ( len( str( instance ).strip() ) > 0 ) ):
# Instance appears to be valid
# Set up request arguments
args = {
'facility': facility,
'instance': instance
}
if live:
args['live'] = True
# Issue request to web service
gateway_rsp = post_request( gateway_hostname, gateway_port, args )
# Convert JSON response to Python dictionary
dc_rsp = json.loads( gateway_rsp.text )
# Extract instance response from the dictionary
dc_inst_rsp = dc_rsp['instance_response']
# Extract result from instance response
if ( dc_inst_rsp['success'] ):
dc_data = dc_inst_rsp['data']
if dc_data['success']:
value = dc_data['presentValue']
units = dc_data['units']
return value, units
# Request multiple values from Building Energy Gateway
def get_bulk( bulk_request, gateway_hostname=None, gateway_port=None ):
bulk_rsp = []
if isinstance( bulk_request, list ) and len( bulk_request ):
# Set up request arguments
args = {
'bulk': json.dumps( bulk_request )
}
# Issue request to web service
gateway_rsp = post_request( gateway_hostname, gateway_port, args )
# Extract result
bulk_rsp = json.loads( gateway_rsp.text )
return bulk_rsp
# Post request to web service
def post_request( gateway_hostname, gateway_port, args ):
# Normalize hostname
if not gateway_hostname:
gateway_hostname = PUBLIC_HOSTNAME
# Normalize port
gateway_port = str( gateway_port ) if gateway_port else ''
# Set SSL and port fragments
if gateway_hostname == PUBLIC_HOSTNAME:
if gateway_port:
s = ''
port = ':' + gateway_port
else:
s = 's'
port = ''
else:
s = ''
if gateway_port:
port = ':' + gateway_port
else:
port = ':' + INTERNAL_PORT
# Format URL
url = 'http' + s + '://' + gateway_hostname + port
# Post request
gateway_rsp = requests.post( url, data=args )
return gateway_rsp