-
Notifications
You must be signed in to change notification settings - Fork 0
/
telegram_bot.py
196 lines (166 loc) · 8.28 KB
/
telegram_bot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
import os
import time
from .tg_config import *
from ._logger import logger
from .io_handler import get_temp_dir, create_zip_archive, walk_dir, clr_temp_dir, get_administrators, get_logfile
from .mysql_database.database_handler import DatabaseHandler
from telebot import TeleBot
from requests.exceptions import ReadTimeout
from mysql.connector.errors import DatabaseError
class TelebotTemplate(TeleBot):
def __init__(self):
super().__init__(token=TELEGRAM_BOT_TOKEN)
self.db_client = DatabaseHandler()
logger.info(f'{self.get_me().first_name} initialized')
@self.message_handler(commands=['help'])
@self.user_is_whitelisted
def on_help(message):
self.reply_to(message, f'View the documentation for the bot here:\n{EXTERNAL_DOCUMENTATION_LINK}')
@self.message_handler(commands=['viewconfig'])
@self.user_is_whitelisted
def on_viewconfig(message):
"""Returns the current configuration of the bot (used as reference for /setconfig)"""
try:
config = self.db_client.pull_user_config(user_id=message.from_user.id)
msg = f"{message.from_user.username} {self.get_me().first_name} Configuration:\n\n"
for k, v in config.items():
# msg += f"{key.strip()} settings:\n"
# for k, v in variables.items():
msg += f'{k}={v}\n'
# msg += '\n'
self.reply_to(message, msg)
except Exception as exc:
logger.exception('Could not call /viewconfig', exc_info=exc)
self.reply_to(message, str(exc))
@self.message_handler(commands=['setconfig'])
@self.user_is_whitelisted
def on_setconfig(message):
"""Used to change configuration variables of the bot"""
msg = ""
failed = []
user_id = message.from_user.id
config = self.db_client.pull_user_config(user_id)
try:
for change in self.split_message(message.text):
try:
conf, val = change.split('=')
# Account for bool case:
if val.lower() == 'true' or val.lower() == 'false':
val = val.lower() == 'true'
try:
var_type = type(config[conf])
except KeyError:
raise KeyError(f"{conf} does not match any available config settings in database.")
# Attempt to push the config update to the database:
self.db_client.update_user_config(user_id=user_id, config_name=conf,
new_value=var_type(val))
msg += f"{conf} set to {val}\n"
logger.info(f"{user_id}: {conf} set to {val} ({var_type})\n")
except Exception as exc:
failed.append((change, str(exc)))
continue
if len(msg) > 0:
self.reply_to(message, "Successfully set configuration:\n\n" + msg)
if len(failed) > 0:
fail_msg = "Failed to set:\n\n"
for fail in failed:
fail_msg += f"- {fail[0]} (Error: {fail[1]})\n"
self.reply_to(message, fail_msg)
except Exception as exc:
logger.exception('Could not set config', exc_info=exc)
self.reply_to(message, str(exc))
@self.message_handler(commands=['whitelist'])
@self.user_is_administrator
def on_whitelist(message):
"""Used to change configuration variables of the bot"""
try:
splt_msg = self.split_message(message.text)
if len(splt_msg) > 0:
new_users = splt_msg[0].split(",")
for user in new_users:
self.db_client.whitelist_user(user)
self.reply_to(message, f'Whitelisted: {new_users}')
logger.info(f'Whitelisted: {new_users}')
else:
msg = f"{self.get_me().full_name} Whitelist:\n"
for user in self.db_client.load_whitelist():
msg += f"- {user}\n"
self.reply_to(message, msg)
except Exception as exc:
self.reply_to(message, str(exc))
@self.message_handler(commands=['restartbot'])
@self.user_is_administrator
def on_restartbot(message):
self.reply_to(message, f'Bot restarting - '
f'Please wait {ERROR_RESTART_DELAY} seconds')
raise Exception("Bot restart called")
@self.message_handler(commands=['getlogs'])
@self.user_is_administrator
def on_getlogs(message):
self.reply_to(message, f'Fetching logfile...')
self.send_document(message.chat.id, open(get_logfile(), 'rb'))
def split_message(self, message, convert_type=None) -> list:
if convert_type is None:
return [chunk.strip() for chunk in message.split(" ")[1:] if
not all(char == " " for char in chunk) and len(chunk) > 0]
else:
return [convert_type(chunk.strip()) for chunk in message.split(" ")[1:] if
not all(char == " " for char in chunk) and len(chunk) > 0]
def user_is_whitelisted(self, func):
"""
(Decorator) Checks if the user is whitelisted before proceeding with the function
:param func: Expects the function to be a message handler, with the 'message' class as the first argument
"""
def wrapper(*args, **kw):
message = args[0]
user_id = str(message.from_user.id)
if user_id in self.db_client.load_whitelist():
return func(*args, **kw)
else:
self.reply_to(message, f"{message.from_user.username} is not whitelisted.\n"
f"Please send your user ID ({user_id}) the developer: {DEVELOPER_CONTACT}")
return False
return wrapper
def user_is_administrator(self, func):
"""
(Decorator) Checks if the user is an administrator before proceeding with the function
:param func: Expects the function to be a message handler, with the 'message' class as the first argument
"""
def wrapper(*args, **kw):
message = args[0]
user_id = str(message.from_user.id)
if user_id in get_administrators():
return func(*args, **kw)
else:
self.reply_to(message,
f"{message.from_user.username} ({message.from_user.id}) is not an administrator.")
return False
return wrapper
def alert_users(self, message: str) -> None:
for user in self.db_client.load_whitelist():
self.send_message(chat_id=user, text=message)
def alert_admins(self, message: str) -> None:
for user in get_administrators():
self.send_message(chat_id=user, text=message)
def run_bot(self):
while True:
try:
logger.info("Bot started")
# Poll for changes
self.polling()
except ReadTimeout:
err_msg = f'Bot has crashed due to read timeout - restarting in {ERROR_RESTART_DELAY} seconds...'
logger.error(err_msg)
self.alert_admins(err_msg)
time.sleep(ERROR_RESTART_DELAY)
except DatabaseError as exc:
logger.exception(f'MySQL error has occurred', exc_info=exc)
self.alert_admins(f'A critical database error has occurred:\n{exc}')
break
except KeyboardInterrupt:
logger.info("Bot stopping for keyboard interrupt...")
break
except Exception as exc:
logger.exception(f'Bot has unexpectedly crashed - restarting in {ERROR_RESTART_DELAY}:', exc_info=exc)
self.alert_admins(f'Bot has unexpectedly crashed - Error {exc}')
time.sleep(ERROR_RESTART_DELAY)