-
Notifications
You must be signed in to change notification settings - Fork 4
/
AppGetHistPower.py
executable file
·51 lines (40 loc) · 1.6 KB
/
AppGetHistPower.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
#!/usr/bin/env python3
import socket
from protobuf_inspector.types import StandardParser
import AppGetHistPower_pb2
import time
import crcmod
# Define the server address and port
server_address = ('192.168.1.203', 10081)
crc16 = crcmod.mkCrcFun(0x18005, rev=True, initCrc=0xFFFF, xorOut=0x0000)
sequence = 1
try:
# Create a TCP socket
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# Connect to the server
client_socket.connect(server_address)
request = AppGetHistPower_pb2.AppGetHistPowerResDTO()
request.oft = 28800
request.time = int(time.time())
header = b'\x48\x4d\xa3\x15'
crc = crc16(request.SerializeToString())
length = len(request.SerializeToString())+10
# Send the data
#print(f"Sending: {header}{sequence.to_bytes(2,byteorder='big')}{crc.to_bytes(2, byteorder='big')}{length.to_bytes(2, byteorder='big')}{request.SerializeToString()}")
client_socket.send(header+sequence.to_bytes(2,byteorder='big')+crc.to_bytes(2, byteorder='big')+length.to_bytes(2, byteorder='big')+request.SerializeToString())
sequence=sequence+1
# Receive the response
response_data = client_socket.recv(1024)
#print(f"Response: {response_data}")
response = AppGetHistPower_pb2.AppGetHistPowerReqDTO()
response.ParseFromString(response_data[10:])
for field_descriptor, value in response.ListFields():
field_name = field_descriptor.name
field_value = value
print(f"{field_name}: {field_value} ", end='')
print('')
except Exception as e:
print(f'Error: {e}')
finally:
# Close the socket
client_socket.close()