-
Notifications
You must be signed in to change notification settings - Fork 1
/
watchdog.py
executable file
·155 lines (131 loc) · 5.69 KB
/
watchdog.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
#!/usr/bin/python3 -u
##################### Configuration Begin ######################
YOUR_QUESTION = '12 + 16 = ?'
YOUR_ANSWER = '28'
TELEGRAM_API_ID = '11111111' # Get api_id and api_hash at my.telegram.org
TELEGRAM_API_HASH = '67e72cc9e2b603e08d05446ad5ef8e6'
TELEGRAM_PHONE = '+12223334444' # Phone number in International Format. Example: '+8617719890604'
##################### Configuration End ########################
from telegram.client import Telegram
import threading, time, os
tg = Telegram(
api_id=TELEGRAM_API_ID,
api_hash=TELEGRAM_API_HASH,
phone=TELEGRAM_PHONE,
database_encryption_key='any_password',
files_directory='tdlib_files/',
)
whitelist_filename = 'whitelisted_chats.log'
whitelisted_chat_ids = []
magic_text = '[tqYH5C]'
msg_verify = 'This account is protected by Telegram Antispam WatchDog.\nPlease answer the question to continue:\n请正确回答以下问题:\n\n' + YOUR_QUESTION
msg_whitelisted = '[Telegram Antispam Watchdog] Whitelisted this chat.'
msg_passed = 'You have passed the verification. Thanks!\n你已经通过验证, 感谢你的理解!'
# We need to mark_message_read() for 30 times, with one second interval. That's the only method to eliminate GMS notification.
# Format: [(chat_id, msg_id, count), ...]
# count will decrease from 30 to 0 by a timer in another thread.
remove_gms_notify_queue = []
remove_gms_notify_queue_lock = threading.Lock()
def read_whitelist_from_disk(fname):
try:
with open(fname, 'r') as f:
for l in f.read().split('\n'):
if l != '':
whitelisted_chat_ids.append(int(l))
except FileNotFoundError:
pass
def write_whitelist_to_disk(fname):
with open(fname, 'w+') as f:
f.write('\n'.join([str(i) for i in whitelisted_chat_ids]))
def mark_msg_read(chat_id, msg_id):
# This function must be called multiple times. For example, call it once a second, for 8 times.
# You must call mark_msg_read_finish() after the last mark_msg_read(). You must wait as long as possible before calling mark_msg_read_finish(), to make the mark_msg_read reliable.
# This problem only appears in GMS notification.
fn_data = {
'@type': 'openChat',
'chat_id': chat_id,
}
tg._tdjson.send(fn_data)
fn_data = {
'@type': 'viewMessages',
'chat_id': chat_id,
'message_ids': [msg_id],
'force_read': True,
}
tg._tdjson.send(fn_data)
def mark_msg_read_finish(chat_id):
fn_data = {
'@type': 'closeChat',
'chat_id': chat_id,
}
tg._tdjson.send(fn_data)
def timer_handler():
# In every second, check if there is any message to be marked as read.
global remove_gms_notify_queue
with remove_gms_notify_queue_lock:
result_list = []
for entry in remove_gms_notify_queue:
chat_id, msg_id, count = entry
mark_msg_read(chat_id, msg_id)
if count-1 > 0:
result_list.append((chat_id, msg_id, count-1))
else:
mark_msg_read_finish(chat_id)
remove_gms_notify_queue = result_list
def new_message_handler(update):
chat_id = update['message']['chat_id']
msg_id = update['message']['id']
message_content = update['message']['content']
is_outgoing = update['message']['is_outgoing']
message_text = message_content.get('text', {}).get('text', '')
# This handler will block all message which satisfies ALL of the following condition:
# 1. Incoming
# 2. Not from group chat (Personal chat)
# 3. chat_id is not in whitelist
# 4. chat_id is not 777000 (Telegram official notification)
# Maybe we can whitelist sender_id instead of chat_id, but I think it doesn't make a difference.
if chat_id < 0 or chat_id == 777000:
return
if chat_id in whitelisted_chat_ids:
return
if is_outgoing:
# Send any outgoing message to add unknown chat to whitelist. (Except verification message)
if magic_text not in message_text:
whitelisted_chat_ids.append(chat_id)
write_whitelist_to_disk(whitelist_filename)
tg.send_message(chat_id=chat_id, text=msg_whitelisted)
return
print("DEBUG: Received a new private chat message which needs verification, chat_id=", chat_id)
# Mark as read to suppress the notification.
mark_msg_read(chat_id, msg_id)
if message_content['@type'] == 'messageText' and message_text.lower() == YOUR_ANSWER.lower():
# Answer is correct: add to whitelist and send hello
print("DEBUG: good answer")
whitelisted_chat_ids.append(chat_id)
write_whitelist_to_disk(whitelist_filename)
tg.send_message(chat_id=chat_id, text=msg_passed)
else:
# Answer is not correct: send verification message and delete his message.
print("DEBUG: bad answer")
tg.send_message(chat_id=chat_id, text=magic_text + msg_verify)
tg.delete_messages(chat_id, [msg_id])
with remove_gms_notify_queue_lock:
remove_gms_notify_queue.append((chat_id, msg_id, 16))
def timer_thread_func():
while True:
timer_handler()
time.sleep(1)
if __name__ == "__main__":
read_whitelist_from_disk(whitelist_filename)
tg.login()
# if this is the first run, library needs to preload all chats
# otherwise the message will not be sent
result = tg.get_chats()
result.wait()
print("Started Telegram Antispam Watchdog. API test by listing your chats: ", result.update)
threading.Thread(target=timer_thread_func).start()
tg.add_message_handler(new_message_handler)
tg.idle() # blocking waiting for CTRL+C
tg.stop() # you must call `stop` at the end of the script
print("Exited")
os._exit(0)