Skip to content

Commit

Permalink
change the nameing conventions for class and variables to genaralise …
Browse files Browse the repository at this point in the history
…email
  • Loading branch information
mohanqxf2 committed Sep 18, 2024
1 parent 801573b commit bc9a32f
Show file tree
Hide file tree
Showing 9 changed files with 881 additions and 1 deletion.
21 changes: 21 additions & 0 deletions integrations/reporting_channels/email/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@


"""
GMail! Woo!
"""

__title__ = 'gmail'
__version__ = '0.1'
__author__ = 'Charlie Guo'
__build__ = 0x0001
__license__ = 'Apache 2.0'
__copyright__ = 'Copyright 2013 Charlie Guo'

from .email import Email
from .mailbox import Mailbox
from .message import Message
from .exceptions import EmailException, ConnectionError, AuthenticationError
from .utils import login, authenticate

215 changes: 215 additions & 0 deletions integrations/reporting_channels/email/email.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
from __future__ import absolute_import
import os
import sys
import re
import imaplib
import logging
from email.header import decode_header
from dotenv import load_dotenv
from integrations.reporting_channels.email.mailbox import Mailbox
from integrations.reporting_channels.email.utf import encode as encode_utf7, decode as decode_utf7
from integrations.reporting_channels.email.exceptions import *
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
load_dotenv()


logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')

class Email():
# EMail IMAP defaults
IMAP_HOST = os.getenv('imaphost')
IMAP_PORT = 993

# EMail SMTP defaults
EMAIL_SMTP_HOST = os.getenv('smtp_ssl_host')
EMAIL_SMTP_PORT = os.getenv('smtp_ssl_port')

def __init__(self):
self.username = None
self.password = None
self.access_token = None

self.imap = None
self.smtp = None
self.logged_in = False
self.mailboxes = {}
self.current_mailbox = None


# self.connect()


def connect(self, raise_errors=True):
# try:
# self.imap = imaplib.IMAP4_SSL(self.GMAIL_IMAP_HOST, self.GMAIL_IMAP_PORT)
# except socket.error:
# if raise_errors:
# raise Exception('Connection failure.')
# self.imap = None

self.imap = imaplib.IMAP4_SSL(self.IMAP_HOST, self.IMAP_PORT)

# self.smtp = smtplib.SMTP(self.server,self.port)
# self.smtp.set_debuglevel(self.debug)
# self.smtp.ehlo()
# self.smtp.starttls()
# self.smtp.ehlo()

return self.imap


# Add fetch_mailboxes method in the Email class
def fetch_mailboxes(self):
response, mailbox_list = self.imap.list()
if response == 'OK':
mailbox_list = [item.decode('utf-8') if isinstance(item, bytes) else item for item in mailbox_list]
for mailbox in mailbox_list:
mailbox_name = mailbox.split('"/"')[-1].replace('"', '').strip()
mailbox = Mailbox(self)
mailbox.external_name = mailbox_name
self.mailboxes[mailbox_name] = mailbox
return list(self.mailboxes.keys())
else:
raise Exception("Failed to fetch mailboxes.")


def use_mailbox(self, mailbox):
if mailbox:
self.imap.select(mailbox)
self.current_mailbox = mailbox
return Mailbox(self, mailbox)


def mailbox(self, mailbox_name):
if mailbox_name not in self.mailboxes:
mailbox_name = encode_utf7(mailbox_name)
mailbox = self.mailboxes.get(mailbox_name)
if mailbox and not self.current_mailbox == mailbox_name:
self.use_mailbox(mailbox_name)

return mailbox

def create_mailbox(self, mailbox_name):
mailbox = self.mailboxes.get(mailbox_name)
if not mailbox:
self.imap.create(mailbox_name)
mailbox = Mailbox(self, mailbox_name)
self.mailboxes[mailbox_name] = mailbox

return mailbox

def delete_mailbox(self, mailbox_name):
mailbox = self.mailboxes.get(mailbox_name)
if mailbox:
self.imap.delete(mailbox_name)
del self.mailboxes[mailbox_name]



def login(self, username, password):
self.username = username
self.password = password

if not self.imap:
self.connect()

try:
imap_login = self.imap.login(self.username, self.password)
self.logged_in = (imap_login and imap_login[0] == 'OK')
if self.logged_in:
self.fetch_mailboxes()
except imaplib.IMAP4.error:
raise AuthenticationError


# smtp_login(username, password)

return self.logged_in

def authenticate(self, username, access_token):
self.username = username
self.access_token = access_token

if not self.imap:
self.connect()

try:
auth_string = 'user=%s\1auth=Bearer %s\1\1' % (username, access_token)
imap_auth = self.imap.authenticate('XOAUTH2', lambda x: auth_string)
self.logged_in = (imap_auth and imap_auth[0] == 'OK')
if self.logged_in:
self.fetch_mailboxes()
except imaplib.IMAP4.error:
raise AuthenticationError

return self.logged_in

def logout(self):
self.imap.logout()
self.logged_in = False


def label(self, label_name):
return self.mailbox(label_name)

def find(self, mailbox_name="[Gmail]/All Mail", **kwargs):
box = self.mailbox(mailbox_name)
return box.mail(**kwargs)


def copy(self, uid, to_mailbox, from_mailbox=None):
if from_mailbox:
self.use_mailbox(from_mailbox)
self.imap.uid('COPY', uid, to_mailbox)

def fetch_multiple_messages(self, messages):
if not isinstance(messages, dict):
raise Exception('Messages must be a dictionary')

fetch_str = ','.join(messages.keys())
response, results = self.imap.uid('FETCH', fetch_str, '(UID BODY.PEEK[] FLAGS)')

for raw_message in results:
if isinstance(raw_message, tuple):
uid_match = re.search(rb'UID (\d+)', raw_message[0])
if uid_match:
uid = uid_match.group(1).decode('utf-8')
if uid in messages:
messages[uid].parse(raw_message)
else:
logging.warning(f'UID {uid} not found in messages dictionary')
else:
logging.warning('UID not found in raw message')
elif isinstance(raw_message, bytes):
continue
else:
logging.warning('Invalid raw message format')

return messages

def labels(self, require_unicode=False):
keys = self.mailboxes.keys()
if require_unicode:
keys = [decode_utf7(key) for key in keys]
return keys

def inbox(self):
return self.mailbox("INBOX")

def spam(self):
return self.mailbox("[Gmail]/Spam")

def starred(self):
return self.mailbox("[Gmail]/Starred")

def all_mail(self):
return self.mailbox("[Gmail]/All Mail")

def sent_mail(self):
return self.mailbox("[Gmail]/Sent Mail")

def important(self):
return self.mailbox("[Gmail]/Important")

def mail_domain(self):
return self.username.split('@')[-1]
23 changes: 23 additions & 0 deletions integrations/reporting_channels/email/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# -*- coding: utf-8 -*-

"""
email.exceptions
~~~~~~~~~~~~~~~~~~~
This module contains the set of Emails' exceptions.
"""


class EmailException(RuntimeError):
"""There was an ambiguous exception that occurred while handling your
request."""

class ConnectionError(EmailException):
"""A Connection error occurred."""

class AuthenticationError(EmailException):
"""Email Authentication failed."""

class Timeout(EmailException):
"""The request timed out."""
105 changes: 105 additions & 0 deletions integrations/reporting_channels/email/mailbox.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
import re
from .message import Message
from .utf import encode as encode_utf7, decode as decode_utf7

class Mailbox():
def __init__(self, email, name="INBOX"):
self.name = name
self.email = email
self.date_format = "%d-%b-%Y"
self.messages = {}

@property
def external_name(self):
if "external_name" not in vars(self):
vars(self)["external_name"] = encode_utf7(self.name)
return vars(self)["external_name"]

@external_name.setter
def external_name(self, value):
if "external_name" in vars(self):
del vars(self)["external_name"]
self.name = decode_utf7(value)

def mail(self, prefetch=False, **kwargs):
search = ['ALL']

kwargs.get('read') and search.append('SEEN')
kwargs.get('unread') and search.append('UNSEEN')

kwargs.get('starred') and search.append('FLAGGED')
kwargs.get('unstarred') and search.append('UNFLAGGED')

kwargs.get('deleted') and search.append('DELETED')
kwargs.get('undeleted') and search.append('UNDELETED')

kwargs.get('draft') and search.append('DRAFT')
kwargs.get('undraft') and search.append('UNDRAFT')

kwargs.get('before') and search.extend(['BEFORE', kwargs.get('before').strftime(self.date_format)])
kwargs.get('after') and search.extend(['SINCE', kwargs.get('after').strftime(self.date_format)])
kwargs.get('on') and search.extend(['ON', kwargs.get('on').strftime(self.date_format)])

kwargs.get('header') and search.extend(['HEADER', kwargs.get('header')[0], kwargs.get('header')[1]])

kwargs.get('sender') and search.extend(['FROM', kwargs.get('sender')])
kwargs.get('fr') and search.extend(['FROM', kwargs.get('fr')])
kwargs.get('to') and search.extend(['TO', kwargs.get('to')])
kwargs.get('cc') and search.extend(['CC', kwargs.get('cc')])

kwargs.get('subject') and search.extend(['SUBJECT', kwargs.get('subject')])
kwargs.get('body') and search.extend(['BODY', kwargs.get('body')])

kwargs.get('label') and search.extend(['X-GM-LABELS', kwargs.get('label')])
kwargs.get('attachment') and search.extend(['HAS', 'attachment'])

kwargs.get('query') and search.extend([kwargs.get('query')])

emails = []
search_criteria = ' '.join(search).encode('utf-8') # Ensure the search criteria are byte strings

response, data = self.email.imap.uid('SEARCH', None, search_criteria)
if response == 'OK':
uids = filter(None, data[0].split(b' ')) # filter out empty strings

for uid in uids:
if not self.messages.get(uid):
self.messages[uid] = Message(self, uid)
emails.append(self.messages[uid])

if prefetch and emails:
messages_dict = {}
for email in emails:
messages_dict[email.uid] = email
self.messages.update(self.email.fetch_multiple_messages(messages_dict))

return emails

# WORK IN PROGRESS. NOT FOR ACTUAL USE
def threads(self, prefetch=False, **kwargs):
emails = []
response, data = self.email.imap.uid('SEARCH', None, 'ALL'.encode('utf-8'))
if response == 'OK':
uids = data[0].split(b' ')

for uid in uids:
if not self.messages.get(uid):
self.messages[uid] = Message(self, uid)
emails.append(self.messages[uid])

if prefetch:
fetch_str = ','.join(uids).encode('utf-8')
response, results = self.email.imap.uid('FETCH', fetch_str, '(BODY.PEEK[] FLAGS X-GM-THRID X-GM-MSGID X-GM-LABELS)')
for index in range(len(results) - 1):
raw_message = results[index]
if re.search(rb'UID (\d+)', raw_message[0]):
uid = re.search(rb'UID (\d+)', raw_message[0]).groups(1)[0]
self.messages[uid].parse(raw_message)

return emails

def count(self, **kwargs):
return len(self.mail(**kwargs))

def cached_messages(self):
return self.messages
Loading

0 comments on commit bc9a32f

Please sign in to comment.