Skip to content
This repository has been archived by the owner on May 29, 2024. It is now read-only.

Commit

Permalink
Update notify.py
Browse files Browse the repository at this point in the history
  • Loading branch information
cziter15 authored Aug 24, 2023
1 parent e3063c9 commit ac3d65d
Showing 1 changed file with 54 additions and 97 deletions.
151 changes: 54 additions & 97 deletions custom_components/mr6400sms/notify.py
Original file line number Diff line number Diff line change
@@ -1,106 +1,63 @@
import asyncio
import async_timeout
import re
import rsa
import base64
import binascii
import logging
import aiohttp

from asyncio import TimeoutError
from aiohttp.client_exceptions import ClientError
import voluptuous as vol
import homeassistant.helpers.config_validation as cv
from homeassistant.components.notify import (ATTR_TARGET, PLATFORM_SCHEMA, BaseNotificationService)
from .tpclient import *

# Const values
_LOGIN_TIMEOUT_SECONDS = 5
# Const values.
MAX_LOGIN_RETRIES = 3
ROUTER_USERNAME = "admin"

class TPCError(Exception):
pass
# Config names.
CONF_ROUTER_IP = 'router_ip'
CONF_ROUTER_PWD = 'router_pwd'

class MR6400:
def __init__(self, hostname):
self.hostname = hostname
self.token = None
self._encryptedUsername = None
self._encryptedPassword = None
self._baseurl = "http://{}/".format(self.hostname)
# Extend PLATFORM_SCHEMA for config params.
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
vol.Required(CONF_ROUTER_IP): cv.string,
vol.Required(CONF_ROUTER_PWD): cv.string,
})

def buildUrl(self, path):
return self._baseurl + path
# Get the LOGGER.
_LOGGER = logging.getLogger(__name__)

async def logout(self):
self.websession = None
self.token = None
def get_service(hass, config, discovery_info=None):
return MR6400SMSNotificationService(config)

def encryptDataRSA(self, data, nn, ee):
public_key = rsa.PublicKey(int(nn, 16), int(ee,16))
encrypted_data = rsa.encrypt(data, public_key)
encrypted_hex = binascii.hexlify(encrypted_data).decode('utf-8')
return encrypted_hex
class MR6400SMSNotificationService(BaseNotificationService):
def __init__(self, config):
self.router_ip = config.get(CONF_ROUTER_IP)
self.router_pwd = config.get(CONF_ROUTER_PWD)

def encryptString(self, value, nn, ee):
encoded = base64.b64encode(value.encode("utf-8"))
return self.encryptDataRSA(encoded, nn, ee)

def extractKeyPart(self, responseText, pattern):
exp = re.compile(pattern, re.IGNORECASE)
match = exp.search(responseText)
return match.group(1) if match else None

async def encryptCredentials(self, username, password):
try:
async with async_timeout.timeout(_LOGIN_TIMEOUT_SECONDS):
url = self.buildUrl('cgi/getParm')
headers = {'Referer': self._baseurl}
async with self.websession.post(url, headers=headers) as response:
if response.status != 200:
raise TPCError("Invalid encryption key request, status: " + str(response.status))
responseText = await response.text()
ee = self.extractKeyPart(responseText, r'(?<=ee=")(.{5}(?:\s|.))')
nn = self.extractKeyPart(responseText, r'(?<=nn=")(.{255}(?:\s|.))')
self._encryptedUsername = self.encryptString(username, nn, ee)
self._encryptedPassword = self.encryptString(password, nn, ee)
except (TimeoutError, ClientError, TPCError) as e:
raise TPCError("Could not retrieve encryption key, reason: " + str(e))

async def login(self, websession, username, password):
async def perform_logout(self, tpc):
try:
self.websession = websession
await self.encryptCredentials(username, password)
await asyncio.sleep(0.1)
async with async_timeout.timeout(_LOGIN_TIMEOUT_SECONDS):
url = self.buildUrl('cgi/login')
params = {'UserName': self._encryptedUsername, 'Passwd': self._encryptedPassword, 'Action': '1', 'LoginStatus':'0' }
headers = { 'Referer': self._baseurl }
async with self.websession.post(url, params=params, headers=headers) as response:
if response.status != 200:
raise TPCError("Invalid login request")
for cookie in self.websession.cookie_jar:
if cookie["domain"] == self.hostname and cookie.key == 'JSESSIONID':
await self.getToken()
return
raise TPCError("Invalid credentials")
except (TimeoutError, ClientError, TPCError) as e:
self.websession = None
raise TPCError("Could not login, reason: " + str(e))

async def getToken(self):
try:
async with async_timeout.timeout(_LOGIN_TIMEOUT_SECONDS):
url = self.buildUrl('')
async with self.websession.get(url) as response:
if response.status != 200:
raise TPCError("Invalid token request, status: " + str(response.status))
responseText = await response.text()
p = re.compile(r'(?<=token=")(.{29}(?:\s|.))', re.IGNORECASE)
m = p.search(responseText)
if m:
self.token = m.group(1)
except (TimeoutError, ClientError, TPCError) as e:
raise TPCError("Could not retrieve token, reason: " + str(e))

async def sms(self, phone, message):
url = self.buildUrl('cgi')
params = { '2': '' }
data = "[LTE_SMS_SENDNEWMSG#0,0,0,0,0,0#0,0,0,0,0,0]0,3\r\nindex=1\r\nto={0}\r\ntextContent={1}\r\n".format(phone, message)
headers= { 'Referer': self._baseurl, 'TokenID': self.token }
async with self.websession.post(url, params=params, data=data, headers=headers) as response:
if response.status != 200:
raise TPCError("Failed sending SMS")
await tpc.logout()
except TPCError as e:
_LOGGER.error(e)

async def perform_login(self, tpc, websession, password):
for retries in range(MAX_LOGIN_RETRIES):
try:
await tpc.login(websession, ROUTER_USERNAME, password)
break
except TPCError as e:
if retries < MAX_LOGIN_RETRIES - 1:
_LOGGER.warning("Retrying login (%d of %d) due to exception {%s}", retries + 1, MAX_LOGIN_RETRIES, e)
await asyncio.sleep(1)

async def async_send_message(self, message, **kwargs):
phone_numbers = kwargs.get(ATTR_TARGET)

async with aiohttp.ClientSession(cookie_jar=aiohttp.CookieJar(unsafe=True)) as websession:
tpc = MR6400(self.router_ip)
try:
await self.perform_login(tpc, websession, self.router_pwd)
for phone in phone_numbers:
await tpc.sms(phone, message)
_LOGGER.info("Sent SMS to %s: %s", phone, message)
except TPCError as e:
_LOGGER.error(e)
finally:
await self.perform_logout(tpc)

0 comments on commit ac3d65d

Please sign in to comment.