Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added 'since' option to search for messages since a certain time #527

Merged
merged 10 commits into from
Nov 2, 2024
3 changes: 3 additions & 0 deletions docs/source/usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,9 @@ The full set of configuration options are:
- `check_timeout` - int: Number of seconds to wait for a IMAP
IDLE response or the number of seconds until the next
mail check (Default: `30`)
- `since` - str: Search for messages since certain time. (Examples: `5m|3h|2d|1w`)
Acceptable units - {"m":"minutes", "h":"hours", "d":"days", "w":"weeks"}).
Defaults to `1d` if incorrect value is provided.
- `imap`
- `host` - str: The IMAP server hostname or IP address
- `port` - int: The IMAP server port (Default: `993`)
Expand Down
71 changes: 64 additions & 7 deletions parsedmarc/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from base64 import b64decode
from collections import OrderedDict
from csv import DictWriter
from datetime import datetime
from datetime import datetime, timedelta
from io import BytesIO, StringIO
from typing import Callable

Expand All @@ -28,7 +28,8 @@
from mailsuite.smtp import send_email

from parsedmarc.log import logger
from parsedmarc.mail import MailboxConnection
from parsedmarc.mail import MailboxConnection, IMAPConnection, \
MSGraphConnection, GmailConnection
from parsedmarc.utils import get_base_domain, get_ip_address_info
from parsedmarc.utils import is_outlook_msg, convert_outlook_msg
from parsedmarc.utils import parse_email
Expand Down Expand Up @@ -1371,6 +1372,7 @@ def get_dmarc_reports_from_mailbox(connection: MailboxConnection,
strip_attachment_payloads=False,
results=None,
batch_size=10,
since=None,
create_folders=True):
"""
Fetches and parses DMARC reports from a mailbox
Expand All @@ -1393,6 +1395,8 @@ def get_dmarc_reports_from_mailbox(connection: MailboxConnection,
results (dict): Results from the previous run
batch_size (int): Number of messages to read and process before saving
(use 0 for no limit)
since: Search for messages since certain time
(units - {"m":"minutes", "h":"hours", "d":"days", "w":"weeks"})
create_folders (bool): Whether to create the destination folders
(not used in watch)

Expand All @@ -1405,6 +1409,9 @@ def get_dmarc_reports_from_mailbox(connection: MailboxConnection,
if connection is None:
raise ValueError("Must supply a connection")

# current_time useful to fetch_messages later in the program
current_time = None

aggregate_reports = []
forensic_reports = []
smtp_tls_reports = []
Expand All @@ -1428,12 +1435,49 @@ def get_dmarc_reports_from_mailbox(connection: MailboxConnection,
connection.create_folder(smtp_tls_reports_folder)
connection.create_folder(invalid_reports_folder)

messages = connection.fetch_messages(reports_folder, batch_size=batch_size)
if since:
_since = 1440 # default one day
if re.match(r'\d+[mhd]$', since):
s = re.split(r'(\d+)', since)
if s[2] == 'm':
_since = int(s[1])
elif s[2] == 'h':
_since = int(s[1])*60
elif s[2] == 'd':
_since = int(s[1])*60*24
elif s[2] == 'w':
_since = int(s[1])*60*24*7
else:
logger.warning("Incorrect format for \'since\' option. \
Provided value:{0}, Expected values:(5m|3h|2d|1w). \
Ignoring option, fetching messages for last 24hrs" \
"SMTP does not support a time or timezone in since." \
"See https://www.rfc-editor.org/rfc/rfc3501#page-52"
.format(since))

if isinstance(connection, IMAPConnection):
seanthegeek marked this conversation as resolved.
Show resolved Hide resolved
logger.debug("Only days and weeks values in \'since\' option are \
considered for IMAP conections. Examples: 2d or 1w")
since = (datetime.utcnow() - timedelta(minutes=_since)).date()
current_time = datetime.utcnow().date()
elif isinstance(connection, MSGraphConnection):
since = (datetime.utcnow() - timedelta(minutes=_since)) \
.isoformat() + 'Z'
current_time = datetime.utcnow().isoformat() + 'Z'
elif isinstance(connection, GmailConnection):
since = (datetime.utcnow() - timedelta(minutes=_since)) \
.strftime('%s')
current_time = datetime.utcnow().strftime('%s')
else:
pass

messages = connection.fetch_messages(reports_folder, batch_size=batch_size,
since=since)
total_messages = len(messages)
logger.debug("Found {0} messages in {1}".format(len(messages),
reports_folder))

if batch_size:
if batch_size and not since:
message_limit = min(total_messages, batch_size)
else:
message_limit = total_messages
Expand All @@ -1445,7 +1489,15 @@ def get_dmarc_reports_from_mailbox(connection: MailboxConnection,
logger.debug("Processing message {0} of {1}: UID {2}".format(
i+1, message_limit, msg_uid
))
msg_content = connection.fetch_message(msg_uid)
if isinstance(mailbox, MSGraphConnection):
if test:
msg_content = connection.fetch_message(msg_uid,
mark_read=False)
else:
msg_content = connection.fetch_message(msg_uid,
mark_read=True)
else:
msg_content = connection.fetch_message(msg_uid)
try:
sa = strip_attachment_payloads
parsed_email = parse_report_email(
Expand Down Expand Up @@ -1564,7 +1616,11 @@ def get_dmarc_reports_from_mailbox(connection: MailboxConnection,
("forensic_reports", forensic_reports),
("smtp_tls_reports", smtp_tls_reports)])

total_messages = len(connection.fetch_messages(reports_folder))
if current_time:
total_messages = len(connection.fetch_messages(reports_folder,
since=current_time))
else:
total_messages = len(connection.fetch_messages(reports_folder))

if not test and not batch_size and total_messages > 0:
# Process emails that came in during the last run
Expand All @@ -1582,7 +1638,8 @@ def get_dmarc_reports_from_mailbox(connection: MailboxConnection,
always_use_local_files=always_use_local_files,
reverse_dns_map_path=reverse_dns_map_path,
reverse_dns_map_url=reverse_dns_map_url,
offline=offline
offline=offline,
since=current_time,
)

return results
Expand Down
4 changes: 4 additions & 0 deletions parsedmarc/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,7 @@ def process_reports(reports_):
mailbox_test=False,
mailbox_batch_size=10,
mailbox_check_timeout=30,
mailbox_since=None,
imap_host=None,
imap_skip_certificate_verification=False,
imap_ssl=True,
Expand Down Expand Up @@ -585,6 +586,8 @@ def process_reports(reports_):
if "check_timeout" in mailbox_config:
opts.mailbox_check_timeout = mailbox_config.getint(
"check_timeout")
if "since" in mailbox_config:
opts.mailbox_since = mailbox_config["since"]

if "imap" in config.sections():
imap_config = config["imap"]
Expand Down Expand Up @@ -1312,6 +1315,7 @@ def process_reports(reports_):
nameservers=opts.nameservers,
test=opts.mailbox_test,
strip_attachment_payloads=opts.strip_attachment_payloads,
since=opts.mailbox_since,
)

aggregate_reports += reports["aggregate_reports"]
Expand Down
44 changes: 32 additions & 12 deletions parsedmarc/mail/gmail.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,18 +67,33 @@ def create_folder(self, folder_name: str):
else:
raise e

def _fetch_all_message_ids(self, reports_label_id, page_token=None):
results = (
self.service.users()
.messages()
.list(
userId="me",
includeSpamTrash=self.include_spam_trash,
labelIds=[reports_label_id],
pageToken=page_token,
def _fetch_all_message_ids(self, reports_label_id, page_token=None,
since=None):
if since:
results = (
self.service.users()
.messages()
.list(
userId="me",
includeSpamTrash=self.include_spam_trash,
labelIds=[reports_label_id],
pageToken=page_token,
q=f'after:{since}',
)
.execute()
)
else:
results = (
self.service.users()
.messages()
.list(
userId="me",
includeSpamTrash=self.include_spam_trash,
labelIds=[reports_label_id],
pageToken=page_token,
)
.execute()
)
.execute()
)
messages = results.get("messages", [])
for message in messages:
yield message["id"]
Expand All @@ -90,7 +105,12 @@ def _fetch_all_message_ids(self, reports_label_id, page_token=None):

def fetch_messages(self, reports_folder: str, **kwargs) -> List[str]:
reports_label_id = self._find_label_id_for_label(reports_folder)
return [id for id in self._fetch_all_message_ids(reports_label_id)]
since = kwargs.get('since')
if since:
return [id for id in self._fetch_all_message_ids(reports_label_id,
since=since)]
else:
return [id for id in self._fetch_all_message_ids(reports_label_id)]

def fetch_message(self, message_id):
msg = self.service.users().messages()\
Expand Down
18 changes: 13 additions & 5 deletions parsedmarc/mail/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,17 +144,22 @@ def fetch_messages(self, folder_name: str, **kwargs) -> List[str]:
folder_id = self._find_folder_id_from_folder_path(folder_name)
url = f'/users/{self.mailbox_name}/mailFolders/' \
f'{folder_id}/messages'
since = kwargs.get('since')
if not since:
since = None
batch_size = kwargs.get('batch_size')
if not batch_size:
batch_size = 0
emails = self._get_all_messages(url, batch_size)
emails = self._get_all_messages(url, batch_size, since)
return [email['id'] for email in emails]

def _get_all_messages(self, url, batch_size):
def _get_all_messages(self, url, batch_size, since):
messages: list
params = {
'$select': 'id'
}
if since:
params['$filter'] = f'receivedDateTime ge {since}'
if batch_size and batch_size > 0:
params['$top'] = batch_size
else:
Expand All @@ -165,8 +170,9 @@ def _get_all_messages(self, url, batch_size):
messages = result.json()['value']
# Loop if next page is present and not obtained message limit.
while '@odata.nextLink' in result.json() and (
since is not None or (
batch_size == 0 or
batch_size - len(messages) > 0):
batch_size - len(messages) > 0)):
result = self._client.get(result.json()['@odata.nextLink'])
if result.status_code != 200:
raise RuntimeError(f'Failed to fetch messages {result.text}')
Expand All @@ -181,13 +187,15 @@ def mark_message_read(self, message_id: str):
raise RuntimeWarning(f"Failed to mark message read"
f"{resp.status_code}: {resp.json()}")

def fetch_message(self, message_id: str):
def fetch_message(self, message_id: str, **kwargs):
url = f'/users/{self.mailbox_name}/messages/{message_id}/$value'
result = self._client.get(url)
if result.status_code != 200:
raise RuntimeWarning(f"Failed to fetch message"
f"{result.status_code}: {result.json()}")
self.mark_message_read(message_id)
mark_read = kwargs.get('mark_read')
if mark_read:
self.mark_message_read(message_id)
return result.text

def delete_message(self, message_id: str):
Expand Down
6 changes: 5 additions & 1 deletion parsedmarc/mail/imap.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,11 @@ def create_folder(self, folder_name: str):

def fetch_messages(self, reports_folder: str, **kwargs):
self._client.select_folder(reports_folder)
return self._client.search()
since = kwargs.get('since')
if since:
return self._client.search([u'SINCE', since])
else:
return self._client.search()

def fetch_message(self, message_id):
return self._client.fetch_message(message_id, parse=False)
Expand Down