-
Notifications
You must be signed in to change notification settings - Fork 104
/
module_wrapper_global.py
550 lines (470 loc) · 22.1 KB
/
module_wrapper_global.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
"""
Copyright (C) 2023-2024 Fern Lane
This file is part of the GPT-Telegramus distribution
(see <https://github.com/F33RNI/GPT-Telegramus>)
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
import logging
import queue
import time
import multiprocessing
from ctypes import c_bool, c_int32
from typing import Dict
from lmao.module_wrapper import STATUS_NOT_INITIALIZED, STATUS_IDLE, STATUS_BUSY, STATUS_FAILED
from lmao.module_wrapper import MODULES as LMAO_MODULES
import psutil
import messages
import users_handler
import request_response_container
from async_helper import async_helper
from bot_sender import send_message_async
from lmao_process_loop import LMAO_LOOP_DELAY, lmao_process_loop
from lmao_process_loop_web import lmao_process_loop_web
from google_ai_module import GoogleAIModule
from ms_copilot_module import MSCopilotModule
from ms_copilot_designer_module import MSCopilotDesignerModule
from groq_module import GroqModule
# List of available modules (their names)
# LlM-Api-Open (LMAO) modules should start with lmao_
# See <https://github.com/F33RNI/LlM-Api-Open> for more info
MODULES = [
"lmao_chatgpt",
"lmao_ms_copilot",
"chatgpt",
"dalle",
"ms_copilot",
"ms_copilot_designer",
"gemini",
"groq",
]
# Names of modules with conversation history (clearable)
MODULES_WITH_HISTORY = ["lmao_chatgpt", "lmao_ms_copilot", "chatgpt", "ms_copilot", "gemini", "groq"]
# Names of modules with ability to select model
MODULES_WITH_MODELS = ["groq"]
# Maximum time (in seconds) to wait for LMAO module to close before killing it's process
_LMAO_STOP_TIMEOUT = 10
# How long to wait for module no become not busy
_WAIT_FOR_IDLE_TIMEOUT = 10
class ModuleWrapperGlobal:
def __init__(
self,
name: str,
config: Dict,
messages_: messages.Messages,
users_handler_: users_handler.UsersHandler,
logging_queue: multiprocessing.Queue,
use_web: bool,
web_cooldown_timer: multiprocessing.Value,
web_request_lock: multiprocessing.Lock,
) -> None:
"""Module's class initialization here (and LMAO process initialization)
This is called from main process. Some other functions (see below) will be called from another processes
Args:
name (str): name of module to initialize (from MODULES)
config (Dict): global config
messages_ (messages.Messages): initialized messages wrapper
users_handler_ (users_handler.UsersHandler): initialized users handler
logging_queue (multiprocessing.Queue): initialized logging queue to handle logs from separate processes
use_web (bool): True to use web API for LMAO modules instead of python package
web_cooldown_timer (multiprocessing.Value): double value that stores last request time to LMAO in seconds
web_request_lock (multiprocessing.Lock): lock to prevent multiple process from sending multiple requests
Raises:
Exception: no module or module class __init__ error
"""
if name not in MODULES:
raise Exception(f"No module named {name}")
self.name = name
self.config = config
self.messages = messages_
self.users_handler = users_handler_
self.logging_queue = logging_queue
self.module = None
# PID for non-LMAO modules for is_busy() function
self._pid_value = multiprocessing.Value(c_int32, -1)
################
# LMAO modules #
################
# Use LMAO ModuleWrapper (initialization will be handled inside _lmao_process)
# All crap below is to adopt non-multiprocessing LMAO
# This will change if I switch LMAO to use multiprocessing instead of multithreading
if name.startswith("lmao_"):
self.name_lmao = name[5:]
if self.name_lmao not in LMAO_MODULES:
raise Exception(f"No lmao module named {self.name_lmao}")
# LMAO process variables
self._lmao_process_running = multiprocessing.Value(c_bool, False)
self._lmao_stop_stream = multiprocessing.Value(c_bool, False)
self._lmao_module_status = multiprocessing.Value(c_int32, STATUS_NOT_INITIALIZED)
# Queue of user_id (int) to clear conversation
self._lmao_delete_conversation_request_queue = multiprocessing.Queue(1)
# Queue of Exception or user_id (same as for requests) as a result of deleting conversation
self._lmao_delete_conversation_response_queue = multiprocessing.Queue(1)
# Queue of RequestResponseContainer for LMAO modules
self._lmao_request_queue = multiprocessing.Queue(1)
self._lmao_response_queue = multiprocessing.Queue(1)
# Queue of lmao Exceptions
self._lmao_exceptions_queue = multiprocessing.Queue(-1)
# Start LMAO process (LMAO modules needs to be loaded constantly so we need all that stuff at least for now)
logging.info("Starting _lmao_process_loop as process")
self._lmao_process = multiprocessing.Process(
target=lmao_process_loop_web if use_web else lmao_process_loop,
args=(
self.name,
self.name_lmao,
self.config,
self.messages,
self.users_handler,
self.logging_queue,
self._lmao_process_running,
self._lmao_stop_stream,
self._lmao_module_status,
self._lmao_delete_conversation_request_queue,
self._lmao_delete_conversation_response_queue,
self._lmao_request_queue,
self._lmao_response_queue,
self._lmao_exceptions_queue,
web_cooldown_timer,
web_request_lock,
),
)
with self._lmao_process_running.get_lock():
self._lmao_process_running.value = True
self._lmao_process.start()
# Wait to initialize or error
logging.info(f"Waiting for {self.name} initialization to finish")
while True:
try:
with self._lmao_module_status.get_lock():
module_status = self._lmao_module_status.value
if module_status == STATUS_IDLE or module_status == STATUS_FAILED:
logging.info(f"{self.name} initialization finished")
break
with self._lmao_process_running.get_lock():
process_running = self._lmao_process_running.value
if not process_running:
logging.info(f"{self.name} process finished")
break
time.sleep(LMAO_LOOP_DELAY)
except (SystemExit, KeyboardInterrupt):
logging.warning("Interrupted")
break
except Exception as e:
logging.error(f"Error waiting for {self.name} to initialize", exc_info=e)
break
##########
# Gemini #
##########
elif name == "gemini":
self.module = GoogleAIModule(config, self.messages, self.users_handler)
##############
# MS Copilot #
##############
elif name == "ms_copilot":
self.module = MSCopilotModule(config, self.messages, self.users_handler)
#######################
# MS Copilot Designer #
#######################
elif name == "ms_copilot_designer":
self.module = MSCopilotDesignerModule(config, self.messages, self.users_handler)
########
# Groq #
########
elif name == "groq":
self.module = GroqModule(config, self.messages, self.users_handler)
def is_busy(self) -> bool:
"""
Returns:
bool: True if current module is busy, False if not
"""
# LMAO module is busy if it's status is not IDLE
if self.name.startswith("lmao_"):
with self._lmao_module_status.get_lock():
module_status = self._lmao_module_status.value
return module_status != STATUS_IDLE
# Other modules -> check for process_request() process
else:
with self._pid_value.get_lock():
pid = self._pid_value.value
if pid >= -1 and psutil.pid_exists(pid):
return True
return False
def process_request(self, request_response: request_response_container.RequestResponseContainer) -> None:
"""Processes request
This is called from separate queue process (non main)
Args:
request_response (request_response_container.RequestResponseContainer): container from the queue
Raises:
Exception: process state / status or any other error
"""
# Set PID for is_busy() function
with self._pid_value.get_lock():
self._pid_value.value = multiprocessing.current_process().pid
# Extract user's language
user_id = request_response.user_id
lang_id = self.users_handler.get_key(user_id, "lang_id", "eng")
# Read user's last timestamp (integer) and module's cooldown
user_last_request_timestamp = self.users_handler.get_key(user_id, f"timestamp_{self.name}")
user_cooldown_seconds = self.config.get(self.name).get("user_cooldown_seconds")
# Check timeout
if user_last_request_timestamp is not None and user_cooldown_seconds is not None:
time_passed_seconds = int(time.time()) - user_last_request_timestamp
# Send timeout message and exit
if time_passed_seconds < user_cooldown_seconds:
request_response.error = True
logging.warning(f"User {user_id} sends {self.name} requests too quickly!")
self._user_module_cooldown(
request_response, user_id, lang_id, user_cooldown_seconds - time_passed_seconds
)
return
# Save current timestamp as integer
self.users_handler.set_key(user_id, f"timestamp_{self.name}", int(time.time()))
################
# LMAO modules #
################
# Redirect request to LMAO process and wait
if self.name.startswith("lmao_"):
# Check status
with self._lmao_process_running.get_lock():
process_running = self._lmao_process_running.value
if not process_running:
raise Exception(f"{self.name} process is not running")
with self._lmao_module_status.get_lock():
module_status = self._lmao_module_status.value
if module_status != STATUS_IDLE:
raise Exception(f"{self.name} status is not idle")
# Put to the queue
self._lmao_request_queue.put(request_response)
# Wait until it's processed or failed
logging.info(f"Waiting for {self.name} request to be processed (waiting for container)")
response_ = None
while True:
# Check process
with self._lmao_process_running.get_lock():
process_running = self._lmao_process_running.value
if not process_running:
raise Exception(f"{self.name} process stopped")
# Check error and re-raise exception
lmao_exception = None
try:
lmao_exception = self._lmao_exceptions_queue.get(block=False)
except queue.Empty:
pass
if lmao_exception is not None:
raise lmao_exception
# Try to get container back
try:
response_ = self._lmao_response_queue.get(block=False)
except queue.Empty:
pass
if response_:
logging.info(f"Received container back from {self.name} process")
break
time.sleep(LMAO_LOOP_DELAY)
# Update container
if response_:
request_response.response_text = response_.response_text
for response_image in response_.response_images:
request_response.response_images.append(response_image)
request_response.response_timestamp = response_.response_timestamp
request_response.response_send_timestamp_last = response_.response_send_timestamp_last
request_response.processing_state = response_.processing_state
request_response.message_id = response_.message_id
request_response.reply_markup = response_.reply_markup
request_response.processing_start_timestamp = response_.processing_start_timestamp
request_response.error = response_.error
request_response.response_next_chunk_start_index = response_.response_next_chunk_start_index
request_response.response_sent_len = response_.response_sent_len
else:
logging.warning(f"Unable to get container back from {self.name} process")
##########
# Gemini #
##########
elif self.name == "gemini":
self.module.initialize()
self.module.process_request(request_response)
##############
# MS Copilot #
##############
elif self.name == "ms_copilot":
self.module.initialize()
self.module.process_request(request_response)
self.module.exit()
#######################
# MS Copilot Designer #
#######################
elif self.name == "ms_copilot_designer":
self.module.initialize()
self.module.process_request(request_response)
########
# Groq #
########
elif self.name == "groq":
self.module.initialize()
self.module.process_request(request_response)
# Done
logging.info(f"{self.name} request processing finished")
def _user_module_cooldown(
self,
request: request_response_container.RequestResponseContainer,
user_id: int,
lang_id: str,
time_left_seconds: int,
) -> None:
"""Sends cooldown message to the user
Args:
request (request_response_container.RequestResponseContainer): container from the queue
user_id (int): ID of user (to not get it from container again)
lang_id (str): user's language (to not get it from container again)
time_left_seconds (int): how much user needs to wait
"""
# Calculate time left
if time_left_seconds < 0:
time_left_seconds = 0
time_left_hours = time_left_seconds // 3600
time_left_minutes = (time_left_seconds - (time_left_hours * 3600)) // 60
time_left_seconds = time_left_seconds - (time_left_hours * 3600) - (time_left_minutes * 60)
# Convert to string (ex. 1h 20m 9s)
time_left_str = ""
if time_left_hours > 0:
if len(time_left_str) > 0:
time_left_str += " "
time_left_str += str(time_left_hours) + self.messages.get_message("hours", lang_id=lang_id)
if time_left_minutes > 0:
if len(time_left_str) > 0:
time_left_str += " "
time_left_str += str(time_left_minutes) + self.messages.get_message("minutes", lang_id=lang_id)
if time_left_seconds > 0:
if len(time_left_str) > 0:
time_left_str += " "
time_left_str += str(time_left_seconds) + self.messages.get_message("seconds", lang_id=lang_id)
if time_left_str == "":
time_left_str = "0" + self.messages.get_message("seconds", lang_id=lang_id)
# Generate cooldown message
module_id = self.users_handler.get_key(user_id, "module", self.config.get("modules").get("default"))
module_icon_name = self.messages.get_message("modules", lang_id=lang_id).get(module_id)
module_name = f"{module_icon_name.get('icon')} {module_icon_name.get('name')}"
request.response_text = self.messages.get_message("user_cooldown_error", lang_id=lang_id).format(
time_formatted=time_left_str, module_name=module_name
)
# Send this message
async_helper(send_message_async(self.config.get("telegram"), self.messages, request, end=True))
def stop_stream(self) -> None:
"""Stops response
This is called from main process and it must NOT raise any errors
"""
# Redirect to LMAO process
if self.name.startswith("lmao_"):
with self._lmao_stop_stream.get_lock():
self._lmao_stop_stream.value = True
# Gemini
elif self.name == "gemini":
with self.module.cancel_requested.get_lock():
self.module.cancel_requested.value = True
# MS Copilot
elif self.name == "ms_copilot":
with self.module.cancel_requested.get_lock():
self.module.cancel_requested.value = True
def delete_conversation(self, user_id: int) -> None:
"""Deletes module's conversation history
This is called from main process and it MUST finish in a reasonable time
So it's good to start processes here to make sure they finished in case of some 3rd party API needs heavy work
Args:
user_id (int): ID or user to delete conversation for
Raises:
Exception: process state / status or any other error
"""
# Wait for module to become not busy or timeout
time_started = time.time()
while True:
if time.time() - time_started > _WAIT_FOR_IDLE_TIMEOUT:
raise Exception("Timeout waiting for module to become available. Please wait a bit and try again")
if not self.is_busy():
break
time.sleep(0.1)
# Redirect to LMAO process and wait
if self.name.startswith("lmao_"):
# Check status
with self._lmao_process_running.get_lock():
process_running = self._lmao_process_running.value
if not process_running:
raise Exception(f"{self.name} process is not running")
with self._lmao_module_status.get_lock():
module_status = self._lmao_module_status.value
if module_status != STATUS_IDLE:
raise Exception(f"{self.name} status is not idle")
# Put to the queue
self._lmao_delete_conversation_request_queue.put(user_id)
# Wait until it's processed or failed
logging.info(f"Waiting for {self.name} to delete conversation")
time.sleep(1)
while True:
# Check process
with self._lmao_process_running.get_lock():
process_running = self._lmao_process_running.value
if not process_running:
raise Exception(f"{self.name} process stopped")
# Check error and re-raise exception
delete_conversation_result = None
try:
delete_conversation_result = self._lmao_delete_conversation_response_queue.get(block=False)
except queue.Empty:
pass
if delete_conversation_result is not None:
# OK
if isinstance(delete_conversation_result, int):
break
# Error -> re-raise exception
else:
raise delete_conversation_result
time.sleep(LMAO_LOOP_DELAY)
# Gemini
elif self.name == "gemini":
self.module.clear_conversation_for_user(user_id)
# MS Copilot
elif self.name == "ms_copilot":
self.module.clear_conversation_for_user(user_id)
# Groq
elif self.name == "groq":
self.module.clear_conversation_for_user(user_id)
def on_exit(self) -> None:
"""Calls module's post-stop actions (and closes LMAO module)
This is called from main process
Raises:
Exception: process kill error
"""
# Close LMAO module and stop it's process
if self.name.startswith("lmao_"):
# We don't need to do anything if process is not running
with self._lmao_process_running.get_lock():
process_running = self._lmao_process_running.value
if not process_running:
return
# Read current status
with self._lmao_module_status.get_lock():
module_status = self._lmao_module_status.value
# Request stream stop and wait a bit
if module_status == STATUS_BUSY:
with self._lmao_stop_stream.get_lock():
self._lmao_stop_stream.value = True
time.sleep(1)
# Ask for process to stop
with self._lmao_process_running.get_lock():
self._lmao_process_running.value = False
# Wait or timeout
logging.info(f"Waiting for {self.name} process to stop")
time_started = time.time()
while self._lmao_process.is_alive():
if time.time() - time_started > _LMAO_STOP_TIMEOUT:
logging.info(f"Trying to kill {self.name} process")
self._lmao_process.kill()
break
time.sleep(LMAO_LOOP_DELAY)