Skip to content

Commit

Permalink
Variable Typing + discord.Embed
Browse files Browse the repository at this point in the history
Add in true notifications for the discord bot so that people can actually see stuff about them.
  • Loading branch information
SocksTheWolf committed Sep 29, 2023
1 parent f1c3227 commit 91e9dc9
Showing 1 changed file with 74 additions and 26 deletions.
100 changes: 74 additions & 26 deletions Main.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ async def PostNotification(self, Message:str):
except discord.HTTPException as ex:
Logger.Log(LogLevel.Log, f"WARN: Unable to send message to notification channel {str(ex)}")

async def PublishAnnouncement(self, Message):
async def PublishAnnouncement(self, Message:str|discord.Embed):
try:
NewMessage = None
if (type(Message) == discord.Embed):
Expand All @@ -150,7 +150,43 @@ async def PublishAnnouncement(self, Message):
Logger.Log(LogLevel.Error, f"Could not publish message, as it did not send!")
except discord.HTTPException as ex:
Logger.Log(LogLevel.Log, f"WARN: Unable to publish message to announcement channel {str(ex)}")


async def LookupUser(self, UserID:int) -> discord.User|None:
try:
return await self.fetch_user(UserID)
except (discord.NotFound):
Logger.Log(LogLevel.Warn, f"UserID {UserID} was not found with error")
except discord.HTTPException as httpEx:
Logger.Log(LogLevel.Warn, f"Failed to fetch user {UserID}, got {str(httpEx)}")
return None

async def CreateBanEmbed(self, TargetId:int) -> discord.Embed:
BanData = self.GetBanInfo(TargetId)
UserBanned:bool = (BanData is not None)
User:discord.User = await self.LookupUser(TargetId)
HasUserData:bool = (User is not None)
UserData = discord.Embed(title="User Data")
if (HasUserData):
UserData.add_field(name="User", value=User.display_name)
UserData.add_field(name="User Handle", value=User.mention)
UserData.set_thumbnail(url=User.display_avatar.url)

UserData.add_field(name="Banned", value=f"{UserBanned}")

# Figure out who banned them
if (UserBanned):
# BannerName, BannerId, Date
UserData.add_field(name="Banned By", value=f"{BanData[0]}")
UserData.add_field(name="Time", value=f"{BanData[2]}", inline=False)
UserData.colour = discord.Colour.red()
elif (not HasUserData):
UserData.colour = discord.Colour.dark_orange()
else:
UserData.colour = discord.Colour.green()

UserData.set_footer(text=f"User ID: {TargetId}")
return UserData

### Event Queueing ###
def AddAsyncTask(self, TaskToComplete):
try:
Expand All @@ -163,18 +199,18 @@ def AddAsyncTask(self, TaskToComplete):
NewTask.add_done_callback(self.AsyncTasks.discard)

### Handling Server Database functionality ###
def IsInServer(self, serverid:int) -> bool:
res = self.Database.execute(f"SELECT * FROM servers WHERE Id={serverid}")
def IsInServer(self, ServerId:int) -> bool:
res = self.Database.execute(f"SELECT * FROM servers WHERE Id={ServerId}")
if (res.fetchone() is None):
return False
else:
return True

def IsActivatedInServer(self, serverid:int) -> bool:
if (not self.IsInServer(serverid)):
def IsActivatedInServer(self, ServerId:int) -> bool:
if (not self.IsInServer(ServerId)):
return False

res = self.Database.execute(f"SELECT Activated FROM servers WHERE Id={serverid}")
res = self.Database.execute(f"SELECT Activated FROM servers WHERE Id={ServerId}")
FetchResult = res.fetchone()
if (FetchResult[0] == 0):
return False
Expand Down Expand Up @@ -239,11 +275,11 @@ async def on_ready(self):
self.UpdateServerDB()
Logger.Log(LogLevel.Notice, "Bot has started!")

async def on_guild_join(self, server):
async def on_guild_join(self, server:discord.Guild):
self.SetBotActivationForOwner(server.owner_id, [server.id], False)
Logger.Log(LogLevel.Notice, f"Bot has joined server {server.name} [{server.id}] of owner {server.owner.name} [{server.owner.id}]")

async def on_guild_remove(self, server):
async def on_guild_remove(self, server:discord.Guild):
if (self.IsInServer(server.id)):
self.Database.execute(f"DELETE FROM servers where Id={server.id}")
self.Database.commit()
Expand Down Expand Up @@ -294,7 +330,7 @@ async def on_message(self, message):
Logger.Log(LogLevel.Debug, f"There is no guild for this server instance!")
return

Sender = message.author
Sender:discord.Member = message.author
# Senders must also be discord server members, not random users
if (type(Sender) is not discord.Member):
Logger.Log(LogLevel.Debug, f"User is not a discord member, this must be a DM")
Expand Down Expand Up @@ -423,14 +459,15 @@ async def on_message(self, message):
for BotServers in QueryResults:
ReplyStr += f"#{RowNum}: Server {BotServers[0]}, Owner {BotServers[1]}, Activated {str(bool(BotServers[2]))}\n"
RowNum += 1
await message.reply(f"{ReplyStr}\nNumServers DB: {len(QueryResults)} Discord: {len(self.guilds)}")
await message.reply(f"{ReplyStr}\nNumServers DB: {len(QueryResults)} | Discord: {len(self.guilds)}")
return

if (Command.startswith("?scamcheck")):
if (self.DoesBanExist(TargetId)):
await message.reply(f"{TargetId} is currently banned")
if (self.IsActivatedInServer(message.guild.id)):
ResponseEmbed:discord.Embed = await self.CreateBanEmbed(TargetId)
await message.reply(embed = ResponseEmbed)
else:
await message.reply(f"{TargetId} is not currently banned")
await message.reply("You must activate your server to use commands!")
elif (Command.startswith("?commands")):
CommandResponse:str = "The list of commands are: \n"
for CommandData in self.CommandList:
Expand All @@ -448,19 +485,23 @@ def DoesBanExist(self, TargetId:int) -> bool:
return False
else:
return True

# Returns the banner's name, the id and the date
def GetBanInfo(self, TargetId:int):
return self.Database.execute(f"SELECT BannerName, BannerId, Date FROM banslist WHERE Id={TargetId}").fetchone()

async def ReprocessBansForServer(self, Server):
Logger.Log(LogLevel.Log, f"Attempting to reimport ban data to {Server.name}")
async def ReprocessBansForServer(self, Server:discord.Guild):
Logger.Log(LogLevel.Log, f"Attempting to import ban data to {Server.name}")
NumBans:int = 0
BanQuery = self.Database.execute(f"SELECT Id FROM banslist")
BanResult = BanQuery.fetchall()
for Ban in BanResult:
User = discord.Object(int(Ban[0]))
NumBans += 1
await self.PerformActionOnServer(Server, User, "User banned by ScamBot", True)
Logger.Log(LogLevel.Notice, f"Processed {NumBans} bans for {Server.name}!")
Logger.Log(LogLevel.Notice, f"Processed {NumBans} bans for {Server.name}[{Server.id}]!")

async def PrepareBan(self, TargetId:int, Sender) -> BanLookup:
async def PrepareBan(self, TargetId:int, Sender:discord.Member) -> BanLookup:
try:
if (self.DoesBanExist(TargetId)):
return BanLookup.Duplicate
Expand All @@ -472,15 +513,17 @@ async def PrepareBan(self, TargetId:int, Sender) -> BanLookup:
# Add to scammer database
data = [(TargetId, Sender.name, Sender.id, datetime.now())]
self.Database.executemany("INSERT INTO banslist VALUES(?, ?, ?, ?)", data)
self.Database.commit()
self.Database.commit()
self.AddAsyncTask(self.PropagateActionToServers(TargetId, Sender, True))

# Send a message to the announcement channel
await self.PublishAnnouncement(f"A ban of user {TargetId} was committed by {Sender.display_name}")
NewAnnouncement:discord.Embed = await self.CreateBanEmbed(TargetId)
NewAnnouncement.title="Ban in Progress"
await self.PublishAnnouncement(NewAnnouncement)

return BanLookup.Banned

async def PrepareUnban(self, TargetId:int, Sender) -> BanLookup:
async def PrepareUnban(self, TargetId:int, Sender:discord.Member) -> BanLookup:
try:
if (not self.DoesBanExist(TargetId)):
return BanLookup.NotExist
Expand All @@ -491,10 +534,15 @@ async def PrepareUnban(self, TargetId:int, Sender) -> BanLookup:
self.Database.execute(f"DELETE FROM banslist where Id={TargetId}")
self.Database.commit()
self.AddAsyncTask(self.PropagateActionToServers(TargetId, Sender, False))
await self.PublishAnnouncement(f"An unban of user {TargetId} was committed by {Sender.display_name}")

# Send a message to the announcement channel
NewAnnouncement:discord.Embed = await self.CreateBanEmbed(TargetId)
NewAnnouncement.title = "Unban in Progress"
await self.PublishAnnouncement(NewAnnouncement)

return BanLookup.Unbanned
async def PerformActionOnServer(self, Server, User, Reason, IsBan:bool) -> (bool, BanResult):

async def PerformActionOnServer(self, Server:discord.Guild, User:discord.Member, Reason:str, IsBan:bool) -> (bool, BanResult):
try:
BanStr:str = "ban"
if (not IsBan):
Expand All @@ -520,7 +568,7 @@ async def PerformActionOnServer(self, Server, User, Reason, IsBan:bool) -> (bool
Logger.Log(LogLevel.Log, f"We encountered an error {(str(ex))} while trying to perform for server {Server.name} owned by {Server.owner_id}!")
return (False, BanResult.Error)

async def PropagateActionToServers(self, TargetId:int, Sender, IsBan:bool):
async def PropagateActionToServers(self, TargetId:int, Sender:discord.Member, IsBan:bool):
NumServersPerformed:int = 0
UserToWorkOn = discord.Object(TargetId)
ScamStr:str = "scammer"
Expand Down

0 comments on commit 91e9dc9

Please sign in to comment.