-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
344 lines (284 loc) · 11.9 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
from __future__ import annotations
from typing import Optional, Union
from pathlib import Path
from datetime import datetime, timedelta
from math import isinf
import asyncio
import traceback
import logging
import discord
from discord.ext import commands
from utils.memory import LongTermMemory, Settings
from utils.flotsam import Deck
from utils.logging import TaskTracker
from config import SWASHBOT_PREFIX, SWASHBOT_DATABASE
# TODO: if a message has a thread attached, delete it?
SwashbotMessageable = Union[discord.TextChannel, discord.Thread]
_swashbot_intents = discord.Intents(
guilds=True,
guild_messages=True,
message_content=True,
)
_swashbot_color = discord.Colour.from_rgb(46, 137, 139)
_swashbot_login_activity = discord.Activity(
type=discord.ActivityType.listening,
name="the soft waves"
)
_swashbot_throttle_seconds = 0.85
class Swashbot(commands.Bot):
"""Represents our beloved ocean bot
Attributes:
color: Bot color theme
ready: `datetime` of when bot first logged in successfuly
disconnects: the number of disconnects since ready
messages_deleted: the number of messages deleted since ready
errors: errors caught since ready
busy_level: the number of channels Swashbot is currently performing busywork on
commands_processed: the number of commands successfully processed
memo: saved `~utils.memory.Settings` for channels
decks: records of channels' messages for smart deletion
"""
color: discord.Colour = _swashbot_color
# analytics
ready: Optional[datetime] = None
disconnects: int = 0
messages_deleted: int = 0
errors: int = 0
busy_level: int = 0
commands_processed: int = 0
def __init__(self) -> None:
commands.Bot.__init__(self, SWASHBOT_PREFIX,
intents=_swashbot_intents,
max_messages=None,
activity=_swashbot_login_activity,
status=discord.Status.online,
)
self.memo = LongTermMemory(Path(SWASHBOT_DATABASE))
self.decks: dict[int, Deck] = {}
self.log = logging.getLogger("swashbot")
self.new_task = TaskTracker()
async def setup_hook(self) -> None:
cogs = []
for file in Path("./cogs").iterdir():
if file.suffix == ".py":
cog = f"cogs.{file.stem}"
await self.load_extension(cog)
cogs.append(cog)
self.log.info(f"Loaded {len(cogs)} cog(s): {cogs}.")
async def on_ready(self) -> None:
if self.ready:
self.disconnects += 1
return
for channel in list(self.memo.settings):
await self.gather_flotsam(channel)
self.ready = datetime.utcnow()
async def on_error(self, event_method: str, *args, **kwargs) -> None:
self.errors += 1
e = traceback.format_exc()
summary = f"method: {event_method}\nargs: {args}\nkwargs: {kwargs}"
details = f"```\n{e}\n```"
self.log.error(f"Encountered error:\n{summary}\n{details}")
async def on_command_error(self, ctx: commands.Context, error: commands.errors.CommandError) -> None:
self.log.error(f"Unexpected command error:\n{error}")
try:
await ctx.message.add_reaction("👀")
except:
pass
return await super().on_command_error(ctx, error)
async def on_message(self, message: discord.Message) -> None:
if not self.ready: return
channel = message.channel.id
if channel in self.memo.settings:
self.decks[channel].append_new(message)
await self.process_commands(message)
async def on_raw_message_delete(self, payload: discord.RawMessageDeleteEvent) -> None:
"""(Whenever a message deletion is detected)
"""
channel = payload.channel_id
if channel not in self.decks: return
message = payload.message_id
try:
self.decks[channel].remove(message)
except KeyError:
pass
async def on_raw_bulk_message_delete(self, payload: discord.RawBulkMessageDeleteEvent) -> None:
"""(Whenever a batch of messages has been detected as deleted)
"""
channel = payload.channel_id
if channel not in self.decks: return
task = self.new_task()
self.log.info(f"{task}: Handling bulk delete of {len(payload.message_ids)} message(s) in channel {payload.channel_id} (guild {payload.guild_id}).")
for message in payload.message_ids:
try:
self.decks[channel].remove(message)
except KeyError:
pass
self.log.info(f"{task}: Done.")
async def on_guild_remove(self, guild: discord.Guild) -> None:
if guild.id not in self.memo.channels: return
channels = self.memo.channels[guild.id]
task = self.new_task()
self.log.info(f"{task}: I was removed from a {guild.name!r} ({guild.id}), so I'll remove its {len(channels)} deck(s) from memory.")
for channel in channels: self.memo.remove(channel)
self.log.info(f"{task}: Done.")
async def on_guild_channel_delete(self, channel: discord.abc.GuildChannel) -> None:
if channel in self.memo.settings:
self.log.info(f"The channel {channel.name!r} ({channel.id}) I was watching was deleted, so I'll remove its deck from memory.")
self.memo.remove(channel.id)
async def on_raw_thread_delete(self, payload: discord.RawThreadDeleteEvent) -> None:
if payload.thread_id in self.memo.settings:
self.log.info(f"The thread {payload.thread_id} I was watching was deleted, so I'll remove its deck from memory.")
self.memo.remove(payload.thread_id)
async def on_command_completion(self, ctx: commands.Context) -> None:
self.commands_processed += 1
args = ", ".join([repr(arg) for arg in ctx.args][2:])
means = f"{ctx.message.content!r}" if ctx.message.content else "a slash command"
summary = (
f"``{ctx.command}({args})`` invoked by "
f"{ctx.author} with {means}"
)
self.log.debug(f"Completed processing command #{self.commands_processed}: {summary}.")
@property
def uptime(self) -> str:
"""Human-readable uptime
"""
age = timedelta() if self.ready is None else (datetime.utcnow() - self.ready)
hours, remainder = divmod(int(age.total_seconds()), 3600)
minutes, seconds = divmod(remainder, 60)
days, hours = divmod(hours, 24)
months, days = divmod(days, 30)
parts = []
if months: parts.append(f"{months}m")
if months or days: parts.append(f"{days}d")
if months or days or hours: parts.append(f"{hours}h")
if months or days or hours or minutes: parts.append(f"{minutes}m")
parts.append(f"{seconds}s")
return " ".join(parts)
@property
def deletion_rate(self) -> str:
"""Messages washed away per unit time
"""
age = timedelta() if self.ready is None else (datetime.utcnow() - self.ready)
seconds = max(0, age.total_seconds())
if seconds == 0 or self.messages_deleted == 0:
return "n/a"
if self.messages_deleted > seconds:
return f"{self.messages_deleted / seconds:.2f} msg/s"
return f"1 message every {seconds / self.messages_deleted:.2f}s"
async def try_channel(self, channel: int) -> SwashbotMessageable:
"""Return full Discord channel object given channel ID
Args:
channel: Channel ID
"""
task = self.new_task()
self.log.debug(f"{task}: Trying to fetch channel {channel}...")
discord_channel = await self.fetch_channel(channel)
assert isinstance(discord_channel, SwashbotMessageable)
self.log.debug(f"{task}: Done.")
return discord_channel
async def gather_flotsam(self, channel: int) -> int:
"""Keep a record of messages in a channel
Args:
channel: Channel ID
Returns:
int: Number of messages gathered.
"""
task = self.new_task()
self.log.info(f"{task}: Gathering flotsam for channel {channel}...")
start = datetime.utcnow()
settings = self.memo.settings[channel]
if not settings: return 0
try:
discord_channel = await self.try_channel(channel)
except discord.NotFound:
if channel in self.memo.settings: self.memo.remove(channel)
return 0
deck = Deck()
limit = None if isinf(settings.at_most) else int(settings.at_most + 10)
async for message in discord_channel.history(limit=limit):
if message.pinned: continue
deck.append_old(message)
self.decks[channel] = deck
minutes, seconds = (int((datetime.utcnow() - start).total_seconds()), 60)
if seconds == 60: seconds = 0 # weird divmod bug
time = f"{minutes}m {seconds}s" if minutes else f"{seconds}s"
self.log.info(f"{task}: Finished gathering flotsam for {discord_channel.name!r} ({channel}) (about {len(deck)} messages(s) after {time}).")
return len(deck)
async def try_delete(self, discord_channel: SwashbotMessageable, id: int) -> None:
"""Attempt to delete a single message
Args:
discord_channel: Full Discord channel object
id: Discord message ID
"""
try:
message = await discord_channel.fetch_message(id)
except discord.NotFound:
self.log.debug(f"Message {id} was not found.")
return
if message.pinned: return
while self.is_ws_ratelimited(): await asyncio.sleep(0)
try:
await message.delete()
self.messages_deleted += 1
except discord.Forbidden:
pass
await asyncio.sleep(_swashbot_throttle_seconds)
async def delete_messages(self, channel: int, *, limit: int, beside: Optional[int]=None) -> None:
"""Delete a number of a channel's most recent messages
Args:
channel: Channel ID
Keyword Args:
limit: Integer number of messages to delete
beside: Message ID of message to avoi deleting
"""
if beside is not None: limit += 1
discord_channel = await self.try_channel(channel)
async for message in discord_channel.history(limit=limit):
if message.id == beside: continue
if message.pinned: continue
try:
await message.delete()
self.messages_deleted += 1
except discord.Forbidden:
pass
await asyncio.sleep(_swashbot_throttle_seconds)
async def check_permissions(self, channel: SwashbotMessageable, required: discord.Permissions, *,
inform: Optional[discord.Message]=None
) -> bool:
"""Check if we have the required permissions to continue
If we do, returns True, otherwise False. Swashbot will also try to
communicate what permissions are required.
Parameters:
channel: The Discord channel to check if we have permissions for
required: Permissions required to return True
inform: The Message to try to contact if we don't have enough permissions
"""
if self.user is None: return False
self_member = channel.guild.get_member(self.user.id)
if self_member is None:
self.log.warning((
f"I tried to check my permissions for {channel.name!r} ({channel.id}) in "
f"{channel.guild.name!r} ({channel.guild.id}), but it seems I'm not in that guild?"
))
return False
perms = channel.permissions_for(self_member)
if required <= perms: return True
missing_perms = \
set(perm for perm, value in required if value) \
- set(perm for perm, value in perms if value)
missing = ", ".join([
f"**{perm}**"
for perm in missing_perms
])
if inform:
inform_channel_perms = inform.channel.permissions_for(self_member)
msg = f"I need the following permission(s): {missing} 🙏"
if inform_channel_perms.send_messages:
async with channel.typing(): await asyncio.sleep(1)
if inform_channel_perms.read_message_history:
await inform.reply(msg)
else:
await inform.channel.send(msg)
elif inform_channel_perms.add_reactions:
await inform.add_reaction("🙏")
return False