-
Notifications
You must be signed in to change notification settings - Fork 0
/
charger2.py
248 lines (205 loc) · 7.92 KB
/
charger2.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
# -*- coding: utf-8 -*-
import time
from json import JSONDecodeError
import requests
from requests import ConnectionError
# API V1 see: https://github.com/goecharger/go-eCharger-API-v1
# API V2 see: https://github.com/goecharger/go-eCharger-API-v2
import const
def search_charger(ip_root, tout = 0.2):
retval = "-1"
command = "/api/status?filter=fna"
for i in range(1, 250):
ip = ip_root + str(i)
print("searching charger on", ip)
try:
response = requests.get(ip + command, timeout=(tout))
statusCode = response.status_code
if statusCode == 200:
print("IP found:", ip)
retval = ip
break
except:
continue
return retval
class Charger:
def __init__(self, url, api_version=2, timeout=15):
"""
ini function at instantiation of this class
:param url: WiFi or cloud url of charger
:param api_version: API version (series CM-02: 1, series CM-03: 2)
:param timeout: optional seconds. Default = 15
"""
if url is None or url == '':
raise ValueError("Please set charger url")
self.url = url
# url = self.__search_charger(self)
# self.url = url
self.api_version = api_version
self.timeout = timeout
if self.api_version == 2:
self.chargerData = {'car': 0, 'amp': 0, 'frc': 0, 'nrg': 15 * [0], 'fsp': 'true', 'psm': 1,
'wh': 0, 'dwo': 0, 'acs': 0, 'err': -1, 'statusCode': 200}
else:
self.chargerData = {'car': 0, 'amp': 0, 'nrg': 15 * [0], 'pha': 0, 'dwo': 0, 'ast': 1, 'err': -1,
'statusCode': -1}
def __search_charger(self, tout=0.5):
retval = "-1"
command = "/api/status?filter=fna"
for i in range(1, 250):
ip = self.url_root + str(i)
try:
# response = requests.get(ip + command, timeout=(tout, 2))
response = requests.get(ip + command, timeout = 0.5)
statusCode = response.status_code
if statusCode == 200:
print("IP found:", ip)
retval = ip
break
except:
continue
return retval
def get_charger_data(self):
"""
Get relevant data from charger
:return: dictionary containing actual charger status
"""
print('\nObtaining charger data ...')
command = "/status"
statusCode = -1
if self.api_version == 2:
xfilter = ''
for keys in self.chargerData:
xfilter = xfilter + ',' + keys
command = '/api/status' + '?' + xfilter
try:
response = requests.get(self.url + command, timeout=self.timeout)
statusCode = response.status_code
if statusCode == 200:
jsonData = response.json()
for key in self.chargerData:
if key in jsonData:
self.chargerData[key] = jsonData[key]
else:
self.chargerData['statusCode'] = statusCode
except ConnectionError:
statusCode = -1
print('CONNECTION ERROR!')
except JSONDecodeError:
statusCode = -2
print('JSON ERROR')
except:
statusCode = -3
print('UNKNOWN ERROR')
self.chargerData['apiVer'] = self.api_version
self.chargerData['statusCode'] = statusCode
return self.chargerData
def __set_charger_param(self, param, value):
"""
INTERNAL function set a single parameter of the charger
:param param: string: charger parameter to be set
:param value: integer: value to be set
:return: dictionary . On communication error: -1 or html status
"""
statusCode = self.chargerData['statusCode']
if statusCode == 200:
value = str(value)
if self.api_version == 2:
command = self.url + '/api/set?' + param + "=" + value
else:
command = self.url + '/mqtt?payload=' + param + "=" + value
print('\nSetting charger Data', command)
try:
response = requests.get(command, timeout=self.timeout)
status_code = response.status_code
except ConnectionError:
statusCode = -1
return statusCode
def start_charging(self):
"""
Starts charging with previously set current
:return: html status, 200 is OK
"""
if self.api_version == 1:
status_code = self.__set_charger_param('alw', 1)
if status_code == 200:
status_code = self.__set_charger_param('ast', 0)
else:
status_code = self.__set_charger_param('acs', 0) # authenticate
if status_code == 200:
status_code = self.__set_charger_param('frc', 2) # start charger
return status_code
def stop_charging(self, authenticate=1):
"""
Stops charger and enables authentication
:return: html status, 200 is OK
"""
if self.api_version == 1:
status_code = self.__set_charger_param('alw', 0)
if status_code == 200:
status_code = self.__set_charger_param('ast', authenticate)
else:
status_code = self.__set_charger_param('frc', 1)
if status_code == 200:
# authenticate in order to prevent automatic start
status_code = self.__set_charger_param('acs', authenticate)
return status_code
def set_current(self, current):
"""
Set charging current im Ampere
:param current: value as integer
:return: html status, 200 is OK
"""
status_code = -2 # parameter Error
if self.api_version == 1:
param = 'amx'
else:
param = 'amp'
if current <= const.C_CHARGER_MAX_CURRENT:
status_code = self.__set_charger_param(param, current)
return status_code
def set_phase(self, phase):
"""
Switch 1 to 3 phase or vice versa.
:param phase: value 1 or 3. Only valid for API version 2 (charger series CM-03)
:return: html status, 200 is OK, -1 if command not recognized
"""
if self.api_version == 1:
status_code = -1
print("phase command not supported in go-e charger CM01 and CM02")
else:
if phase == 1:
value = const.C_CHARGER_1_PHASE
else:
value = const.C_CHARGER_3_PHASES
status_code = self.__set_charger_param('psm', value)
return status_code
# test
if __name__ == "__main__":
# go = Charger("http://192.168.0.30", 2)
# go = Charger("http://192.168.0.11", 1)
# charger_ip = search_charger(const.C_CHARGER_WIFI_URL)
go = Charger(charger_ip, 2)
# status = go.get_charger_data()
# status = go.set_current(7)
# print('set_current() status =', status)
# status = go.set_phase(1)
# print('set_phasestatus e=', status)
# status = go.start_charging()
# print('START status =', status)
# wait = 20
# print('SLEEPING ... ', wait)
# time.sleep(wait)
# status = go.get_charger_data()
# print('ecReadChargerData() status =', status)
# status = go.stop_charging()
# print('STROP status =', status)
# #
# # go = Charger("http://192.168.0.11", 1)
# # status = go.set_current(8)
# # print('status =', status)
# #
# # status = go.ecReadChargerData()
# #
# # status = go.set_phase(1)
# # print(status)