From 15c0fd970a2a7c233617f048b6a2df4185dc2114 Mon Sep 17 00:00:00 2001 From: lingfish Date: Fri, 28 Jun 2024 10:54:23 +1000 Subject: [PATCH] Implement the ability to log to a GELF server/input, via the use of pygelf. --- docs/source/usage.md | 9 +++++++ parsedmarc/cli.py | 56 +++++++++++++++++++++++++++++++++++++-- parsedmarc/gelf.py | 62 ++++++++++++++++++++++++++++++++++++++++++++ pyproject.toml | 1 + 4 files changed, 126 insertions(+), 2 deletions(-) create mode 100644 parsedmarc/gelf.py diff --git a/docs/source/usage.md b/docs/source/usage.md index 8b756b3f..11d479e5 100644 --- a/docs/source/usage.md +++ b/docs/source/usage.md @@ -98,6 +98,11 @@ path = parsedmarc [syslog] server = localhost port = 514 + +[gelf] +host = logger +port = 12201 +mode = tcp ``` The full set of configuration options are: @@ -343,6 +348,10 @@ The full set of configuration options are: :::{note} Information regarding the setup of the Data Collection Rule can be found [here](https://learn.microsoft.com/en-us/azure/azure-monitor/logs/tutorial-logs-ingestion-portal). ::: +- `gelf` + - `host` - str: The GELF server name or IP address + - `port` - int: The port to use + - `mode` - str: The GELF transport type to use. Valid modes: `tcp`, `udp`, `tls` :::{warning} It is **strongly recommended** to **not** use the `nameservers` diff --git a/parsedmarc/cli.py b/parsedmarc/cli.py index d2ebae61..5f598646 100644 --- a/parsedmarc/cli.py +++ b/parsedmarc/cli.py @@ -19,7 +19,7 @@ from parsedmarc import get_dmarc_reports_from_mailbox, watch_inbox, \ parse_report_file, get_dmarc_reports_from_mbox, elastic, opensearch, \ kafkaclient, splunk, save_output, email_results, ParserError, \ - __version__, InvalidDMARCReport, s3, syslog, loganalytics + __version__, InvalidDMARCReport, s3, syslog, loganalytics, gelf from parsedmarc.mail import IMAPConnection, MSGraphConnection, GmailConnection from parsedmarc.mail.graph import AuthMethod @@ -145,6 +145,12 @@ def process_reports(reports_): except Exception as error_: logger.error("Syslog Error: {0}".format(error_.__str__())) + try: + if opts.gelf_host: + gelf_client.save_aggregate_report_to_gelf(report) + except Exception as error_: + logger.error("GELF Error: {0}".format(error_.__str__())) + if opts.hec: try: aggregate_reports_ = reports_["aggregate_reports"] @@ -212,6 +218,12 @@ def process_reports(reports_): except Exception as error_: logger.error("Syslog Error: {0}".format(error_.__str__())) + try: + if opts.gelf_host: + gelf_client.save_forensic_report_to_gelf(report) + except Exception as error_: + logger.error("GELF Error: {0}".format(error_.__str__())) + if opts.hec: try: forensic_reports_ = reports_["forensic_reports"] @@ -279,6 +291,12 @@ def process_reports(reports_): except Exception as error_: logger.error("Syslog Error: {0}".format(error_.__str__())) + try: + if opts.gelf_host: + gelf_client.save_smtp_tls_report_to_gelf(report) + except Exception as error_: + logger.error("GELF Error: {0}".format(error_.__str__())) + if opts.hec: try: smtp_tls_reports_ = reports_["smtp_tls_reports"] @@ -491,7 +509,10 @@ def process_reports(reports_): la_dcr_immutable_id=None, la_dcr_aggregate_stream=None, la_dcr_forensic_stream=None, - la_dcr_smtp_tls_stream=None + la_dcr_smtp_tls_stream=None, + gelf_host=None, + gelf_port=None, + gelf_mode=None, ) args = arg_parser.parse_args() @@ -992,6 +1013,27 @@ def process_reports(reports_): opts.la_dcr_smtp_tls_stream = \ log_analytics_config.get("dcr_smtp_tls_stream") + if "gelf" in config.sections(): + gelf_config = config["gelf"] + if "host" in gelf_config: + opts.gelf_host = gelf_config["host"] + else: + logger.critical("host setting missing from the " + "gelf config section") + exit(-1) + if "port" in gelf_config: + opts.gelf_port = gelf_config["port"] + else: + logger.critical("port setting missing from the " + "gelf config section") + exit(-1) + if "mode" in gelf_config: + opts.gelf_mode = gelf_config["mode"] + else: + logger.critical("mode setting missing from the " + "gelf config section") + exit(-1) + logger.setLevel(logging.ERROR) if opts.warnings: @@ -1130,6 +1172,16 @@ def process_reports(reports_): except Exception as error_: logger.error("Kafka Error: {0}".format(error_.__str__())) + if opts.gelf_host: + try: + gelf_client = gelf.GelfClient( + host=opts.gelf_host, + port=int(opts.gelf_port), + mode=opts.gelf_mode, + ) + except Exception as error_: + logger.error("GELF Error: {0}".format(error_.__str__())) + kafka_aggregate_topic = opts.kafka_aggregate_topic kafka_forensic_topic = opts.kafka_forensic_topic kafka_smtp_tls_topic = opts.kafka_smtp_tls_topic diff --git a/parsedmarc/gelf.py b/parsedmarc/gelf.py new file mode 100644 index 00000000..3f745104 --- /dev/null +++ b/parsedmarc/gelf.py @@ -0,0 +1,62 @@ +# -*- coding: utf-8 -*- + +import logging +import logging.handlers +import json +import threading + +from parsedmarc import parsed_aggregate_reports_to_csv_rows, \ + parsed_forensic_reports_to_csv_rows, parsed_smtp_tls_reports_to_csv_rows +from pygelf import GelfTcpHandler, GelfUdpHandler, GelfTlsHandler + + +log_context_data = threading.local() + +class ContextFilter(logging.Filter): + + def filter(self, record): + record.parsedmarc = log_context_data.parsedmarc + return True + + +class GelfClient(object): + """A client for the Graylog Extended Log Format""" + + def __init__(self, host, port, mode): + """ + Initializes the GelfClient + Args: + host (str): The GELF host + port (int): The GELF port + mode (str): The GELF transport mode + """ + self.host = host + self.port = port + self.logger = logging.getLogger('parsedmarc_syslog') + self.logger.setLevel(logging.INFO) + self.logger.addFilter(ContextFilter()) + self.gelf_mode = { + 'udp': GelfUdpHandler, + 'tcp': GelfTcpHandler, + 'tls': GelfTlsHandler, + } + self.handler = self.gelf_mode[mode](host=self.host, port=self.port, include_extra_fields=True) + self.logger.addHandler(self.handler) + + def save_aggregate_report_to_gelf(self, aggregate_reports): + rows = parsed_aggregate_reports_to_csv_rows(aggregate_reports) + for row in rows: + log_context_data.parsedmarc = row + self.logger.info('parsedmarc aggregate report') + + log_context_data.parsedmarc = None + + def save_forensic_report_to_gelf(self, forensic_reports): + rows = parsed_forensic_reports_to_csv_rows(forensic_reports) + for row in rows: + self.logger.info(json.dumps(row)) + + def save_smtp_tls_report_to_gelf(self, smtp_tls_reports): + rows = parsed_smtp_tls_reports_to_csv_rows(smtp_tls_reports) + for row in rows: + self.logger.info(json.dumps(row)) diff --git a/pyproject.toml b/pyproject.toml index fc478217..9a98018f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -54,6 +54,7 @@ dependencies = [ "tqdm>=4.31.1", "urllib3>=1.25.7", "xmltodict>=0.12.0", + "pygelf>=0.4.2", ] [project.scripts]